Spark RDD에 대한 개념 정리 및 예제[1]
서론
Spark는 데이터를 RDD라는 형태로 만들어서 사용하게 된다. 이 RDD(Resilient Distributed Dataset)를 가공하기 위한 방법에는 두가지 있다. 첫번째는 트랜스포메이션, 두번째는 액션이다.
먼저 트랜스포메이션에 대해서 설명을 하면 트랜스포메이션은 기존에 존재하던 RDD에 동작하여 새로운 RDD를 생성해낸다. 액션은 RDD에 저장되어 있는 결과데이터를 반환해준다. 여기서 중요한 것이 하나 있는데 RDD에 트랜스포메이션을 가해봤자 액션이 동작하기 전까지는 해당 트랜스포메이션 들이 동작 하지 않는다.
그 이유는 Spark가 동작하는 방식인데 Spark는 액션이 발생하기 전까지 RDD에 부여 되었던 트랜스포메이션에 대한 메타데이터만 저장하고 있다가 액션이 발생하면 해당 메타데이터를 기반으로 RDD에 대한 처리를 하게 된다. 이렇게 하는 이유는 메모리를 효율적으로 사용하는데 도움을 주기 위함이다.
이번 포스팅에서는 간단하게 RDD를 사용하는 방법에 대해서 포스팅을 하며 이 포스팅을 시작으로 Spark SQL, Spark Cluster(Standalone), Spark Streaming등에 대해서 순차적으로 포스팅을 진행해 나갈 생각이다.
이번 포스팅에서 사용된 Java version은 1.8, Spark version은 2.0.1이다.
RDD 사용법
1. 트랜스포메이션만 적용
먼저 서론에서 설명했듯이 RDD에 액션이 적용되기 전까지 트랜스포메이션에 대한 동작은 발생하지 않는 것을 확인 해보자.
package sparks; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class RDDEx { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("RDD Ex").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> testList = new ArrayList<>(); testList.add("test1"); testList.add("test2"); testList.add("test3"); JavaRDD<String> inputRDD = sc.parallelize(testList); JavaRDD<String> newRDD = inputRDD.map((line) -> line + "Transformate"); } }
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17/05/24 00:32:38 INFO SparkContext: Running Spark version 2.0.1 17/05/24 00:32:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/05/24 00:32:38 INFO SecurityManager: Changing view acls to: birdhead 17/05/24 00:32:38 INFO SecurityManager: Changing modify acls to: birdhead 17/05/24 00:32:38 INFO SecurityManager: Changing view acls groups to: 17/05/24 00:32:38 INFO SecurityManager: Changing modify acls groups to: 17/05/24 00:32:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(birdhead); groups with view permissions: Set(); users with modify permissions: Set(birdhead); groups with modify permissions: Set() 17/05/24 00:32:39 INFO Utils: Successfully started service 'sparkDriver' on port 54110. 17/05/24 00:32:39 INFO SparkEnv: Registering MapOutputTracker 17/05/24 00:32:39 INFO SparkEnv: Registering BlockManagerMaster 17/05/24 00:32:39 INFO DiskBlockManager: Created local directory at /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/blockmgr-f8cffcf9-8fa2-4497-9e7a-35e64f058e6d 17/05/24 00:32:39 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB 17/05/24 00:32:39 INFO SparkEnv: Registering OutputCommitCoordinator 17/05/24 00:32:39 INFO Utils: Successfully started service 'SparkUI' on port 4040. 17/05/24 00:32:39 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://***.***.***.***:4040 17/05/24 00:32:39 INFO Executor: Starting executor ID driver on host localhost 17/05/24 00:32:39 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54111. 17/05/24 00:32:39 INFO NettyBlockTransferService: Server created on 192.168.219.108:54111 17/05/24 00:32:39 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ***.***.***.***, 54111) 17/05/24 00:32:39 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.219.108:54111 with 2004.6 MB RAM, BlockManagerId(driver, 192.168.219.108, 54111) 17/05/24 00:32:39 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ***.***.***.***, 54111) 17/05/24 00:32:39 INFO SparkContext: Invoking stop() from shutdown hook 17/05/24 00:32:39 INFO SparkUI: Stopped Spark web UI at http://***.***.***.***:4040 17/05/24 00:32:39 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/05/24 00:32:39 INFO MemoryStore: MemoryStore cleared 17/05/24 00:32:39 INFO BlockManager: BlockManager stopped 17/05/24 00:32:39 INFO BlockManagerMaster: BlockManagerMaster stopped 17/05/24 00:32:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/05/24 00:32:39 INFO SparkContext: Successfully stopped SparkContext 17/05/24 00:32:39 INFO ShutdownHookManager: Shutdown hook called 17/05/24 00:32:39 INFO ShutdownHookManager: Deleting directory /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/spark-c75423d7-bb80-4e70-87f4-2412ca929bbd
package sparks; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class RDDEx { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("RDD Ex").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> testList = new ArrayList<>(); testList.add("test1"); testList.add("test2"); testList.add("test3"); JavaRDD<String> inputRDD = sc.parallelize(testList); JavaRDD<String> newRDD = inputRDD.map((line) -> line + "Transformate"); List<String> strList = newRDD.collect(); for(String str : strList) System.out.println(str); } }
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17/05/24 00:45:46 INFO SparkContext: Running Spark version 2.0.1 17/05/24 00:45:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/05/24 00:45:46 INFO SecurityManager: Changing view acls to: birdhead 17/05/24 00:45:46 INFO SecurityManager: Changing modify acls to: birdhead 17/05/24 00:45:46 INFO SecurityManager: Changing view acls groups to: 17/05/24 00:45:46 INFO SecurityManager: Changing modify acls groups to: 17/05/24 00:45:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(birdhead); groups with view permissions: Set(); users with modify permissions: Set(birdhead); groups with modify permissions: Set() 17/05/24 00:45:47 INFO Utils: Successfully started service 'sparkDriver' on port 54155. 17/05/24 00:45:47 INFO SparkEnv: Registering MapOutputTracker 17/05/24 00:45:47 INFO SparkEnv: Registering BlockManagerMaster 17/05/24 00:45:47 INFO DiskBlockManager: Created local directory at /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/blockmgr-a4bf01fe-0323-4361-b8be-614472214249 17/05/24 00:45:47 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB 17/05/24 00:45:47 INFO SparkEnv: Registering OutputCommitCoordinator 17/05/24 00:45:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 17/05/24 00:45:47 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://***.***.***.***:4040 17/05/24 00:45:47 INFO Executor: Starting executor ID driver on host localhost 17/05/24 00:45:47 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54156. 17/05/24 00:45:47 INFO NettyBlockTransferService: Server created on ***.***.***.***:54156 17/05/24 00:45:47 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ***.***.***.***, 54156) 17/05/24 00:45:47 INFO BlockManagerMasterEndpoint: Registering block manager ***.***.***.***:54156 with 2004.6 MB RAM, BlockManagerId(driver, ***.***.***.***, 54156) 17/05/24 00:45:47 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ***.***.***.***, 54156) 17/05/24 00:45:47 INFO SparkContext: Starting job: collect at RDDEx.java:25 17/05/24 00:45:47 INFO DAGScheduler: Got job 0 (collect at RDDEx.java:25) with 1 output partitions 17/05/24 00:45:47 INFO DAGScheduler: Final stage: ResultStage 0 (collect at RDDEx.java:25) 17/05/24 00:45:47 INFO DAGScheduler: Parents of final stage: List() 17/05/24 00:45:47 INFO DAGScheduler: Missing parents: List() 17/05/24 00:45:47 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at RDDEx.java:23), which has no missing parents 17/05/24 00:45:47 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 2004.6 MB) 17/05/24 00:45:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1675.0 B, free 2004.6 MB) 17/05/24 00:45:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ***.***.***.***:54156 (size: 1675.0 B, free: 2004.6 MB) 17/05/24 00:45:47 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012 /******** 다른 부분1 ********/ 17/05/24 00:45:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at RDDEx.java:23) 17/05/24 00:45:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 17/05/24 00:45:48 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5412 bytes) 17/05/24 00:45:48 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 17/05/24 00:45:48 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 816 bytes result sent to driver 17/05/24 00:45:48 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 61 ms on localhost (1/1) 17/05/24 00:45:48 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool /******** 다른 부분2 ********/ 17/05/24 00:45:48 INFO DAGScheduler: ResultStage 0 (collect at RDDEx.java:25) finished in 0.074 s 17/05/24 00:45:48 INFO DAGScheduler: Job 0 finished: collect at RDDEx.java:25, took 0.220065 s test1Transformate test2Transformate test3Transformate 17/05/24 00:45:48 INFO SparkContext: Invoking stop() from shutdown hook 17/05/24 00:45:48 INFO SparkUI: Stopped Spark web UI at http://***.***.***.***:4040 17/05/24 00:45:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/05/24 00:45:48 INFO MemoryStore: MemoryStore cleared 17/05/24 00:45:48 INFO BlockManager: BlockManager stopped 17/05/24 00:45:48 INFO BlockManagerMaster: BlockManagerMaster stopped 17/05/24 00:45:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/05/24 00:45:48 INFO SparkContext: Successfully stopped SparkContext 17/05/24 00:45:48 INFO ShutdownHookManager: Shutdown hook called 17/05/24 00:45:48 INFO ShutdownHookManager: Deleting directory /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/spark-980aa1a2-70af-4e57-8c42-d85e65c34a93
로그에/******** 다른 부분 ********/을 보게 되면 액션 동작시에는 트랜스포메이션이 동작한 것을 확인 할 수 있다.
결론
이번 포스팅에서는 간단하게 Spark의 RDD에 대해서 알아 보았다. Spark에서 RDD를 생성하는 방법에는 파일을 읽는 방법, 데이터베이스를 읽는 방법등 여러가지 방법을 제공 해준다. 핵심은 RDD에 적용되는 연산인 트랜스포메이션과 액션이 어떻게 동작되는지에 대해서 확인 할 수 있어야 한다.