티스토리 뷰

반응형

서론

Spark는 데이터를 RDD라는 형태로 만들어서 사용하게 된다. 이 RDD(Resilient Distributed Dataset)를 가공하기 위한 방법에는 두가지 있다. 첫번째는 트랜스포메이션, 두번째는 액션이다. 

먼저 트랜스포메이션에 대해서 설명을 하면 트랜스포메이션은 기존에 존재하던 RDD에 동작하여 새로운 RDD를 생성해낸다. 액션은 RDD에 저장되어 있는 결과데이터를 반환해준다. 여기서 중요한 것이 하나 있는데 RDD에 트랜스포메이션을 가해봤자 액션이 동작하기 전까지는 해당 트랜스포메이션 들이 동작 하지 않는다. 

그 이유는 Spark가 동작하는 방식인데 Spark는 액션이 발생하기 전까지 RDD에 부여 되었던 트랜스포메이션에 대한 메타데이터만 저장하고 있다가 액션이 발생하면 해당 메타데이터를 기반으로 RDD에 대한 처리를 하게 된다. 이렇게 하는 이유는 메모리를 효율적으로 사용하는데 도움을 주기 위함이다.

이번 포스팅에서는 간단하게 RDD를 사용하는 방법에 대해서 포스팅을 하며 이 포스팅을 시작으로 Spark SQL, Spark Cluster(Standalone), Spark Streaming등에 대해서 순차적으로 포스팅을 진행해 나갈 생각이다. 

이번 포스팅에서 사용된 Java version은 1.8, Spark version은 2.0.1이다.


RDD 사용법

1. 트랜스포메이션만 적용

먼저 서론에서 설명했듯이 RDD에 액션이 적용되기 전까지 트랜스포메이션에 대한 동작은 발생하지 않는 것을 확인 해보자.

 
package sparks;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class RDDEx {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("RDD Ex").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> testList = new ArrayList<>();
		
		testList.add("test1");
		testList.add("test2");
		testList.add("test3");
		
		JavaRDD<String> inputRDD = sc.parallelize(testList);
		
		JavaRDD<String> newRDD = inputRDD.map((line) -> line + "Transformate");
	}
}


소스를 먼저 간략하게 설명하면 Spark Application을 만들기 위해서는 기본적으로 SparkConf와 JavaSparkContext(Scala, Python, R을 쓸 경우 다름)에 대한 구현을 해주어야 한다. SparkConf에는 Spark가 동작하면서 사용하게 될 설정에 대한 내용으로써 spark-submit, properties file에 의한 설정보다 가장 우선시 됀다. 
이렇게 설정을 하고 난 후 String type의 List을 만들어 sc.parallelize의 파라미터로 전달하게 되면 RDD 생성이 끝이 난다. 
다음은 만들어진 inputRDD에 대한 map 트랜스포메이션을 사용한 것으로 다음과 같은 상황에서 동작하게 되면 아래와 같은 로그가 생성된다.


 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/05/24 00:32:38 INFO SparkContext: Running Spark version 2.0.1
17/05/24 00:32:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/24 00:32:38 INFO SecurityManager: Changing view acls to: birdhead
17/05/24 00:32:38 INFO SecurityManager: Changing modify acls to: birdhead
17/05/24 00:32:38 INFO SecurityManager: Changing view acls groups to: 
17/05/24 00:32:38 INFO SecurityManager: Changing modify acls groups to: 
17/05/24 00:32:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(birdhead); groups with view permissions: Set(); users  with modify permissions: Set(birdhead); groups with modify permissions: Set()
17/05/24 00:32:39 INFO Utils: Successfully started service 'sparkDriver' on port 54110.
17/05/24 00:32:39 INFO SparkEnv: Registering MapOutputTracker
17/05/24 00:32:39 INFO SparkEnv: Registering BlockManagerMaster
17/05/24 00:32:39 INFO DiskBlockManager: Created local directory at /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/blockmgr-f8cffcf9-8fa2-4497-9e7a-35e64f058e6d
17/05/24 00:32:39 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
17/05/24 00:32:39 INFO SparkEnv: Registering OutputCommitCoordinator
17/05/24 00:32:39 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/05/24 00:32:39 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://***.***.***.***:4040
17/05/24 00:32:39 INFO Executor: Starting executor ID driver on host localhost
17/05/24 00:32:39 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54111.
17/05/24 00:32:39 INFO NettyBlockTransferService: Server created on 192.168.219.108:54111
17/05/24 00:32:39 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ***.***.***.***, 54111)
17/05/24 00:32:39 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.219.108:54111 with 2004.6 MB RAM, BlockManagerId(driver, 192.168.219.108, 54111)
17/05/24 00:32:39 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ***.***.***.***, 54111)
17/05/24 00:32:39 INFO SparkContext: Invoking stop() from shutdown hook
17/05/24 00:32:39 INFO SparkUI: Stopped Spark web UI at http://***.***.***.***:4040
17/05/24 00:32:39 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/05/24 00:32:39 INFO MemoryStore: MemoryStore cleared
17/05/24 00:32:39 INFO BlockManager: BlockManager stopped
17/05/24 00:32:39 INFO BlockManagerMaster: BlockManagerMaster stopped
17/05/24 00:32:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/05/24 00:32:39 INFO SparkContext: Successfully stopped SparkContext
17/05/24 00:32:39 INFO ShutdownHookManager: Shutdown hook called
17/05/24 00:32:39 INFO ShutdownHookManager: Deleting directory /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/spark-c75423d7-bb80-4e70-87f4-2412ca929bbd

다음과 같은 로그를 확인 할 수 있다. 

2. 액션 적용


 
package sparks;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class RDDEx {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("RDD Ex").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> testList = new ArrayList<>();
		
		testList.add("test1");
		testList.add("test2");
		testList.add("test3");
		
		JavaRDD<String> inputRDD = sc.parallelize(testList);
		
		JavaRDD<String> newRDD = inputRDD.map((line) -> line + "Transformate");
		
		List<String> strList = newRDD.collect();
		
		for(String str : strList) 
			System.out.println(str);
	}
}

나머지 부분은 동일 하며 newRDD에 collect라는 액션이 동작된 것을 확인 할 수 있다. 이와 같이 액션이 동작하게 되면 로그는 다음과 같고 트랜스포메이션만 동작되는 경우와는 다른 로그 결과를 보인다. 


 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/birdhead/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/05/24 00:45:46 INFO SparkContext: Running Spark version 2.0.1
17/05/24 00:45:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/24 00:45:46 INFO SecurityManager: Changing view acls to: birdhead
17/05/24 00:45:46 INFO SecurityManager: Changing modify acls to: birdhead
17/05/24 00:45:46 INFO SecurityManager: Changing view acls groups to: 
17/05/24 00:45:46 INFO SecurityManager: Changing modify acls groups to: 
17/05/24 00:45:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(birdhead); groups with view permissions: Set(); users  with modify permissions: Set(birdhead); groups with modify permissions: Set()
17/05/24 00:45:47 INFO Utils: Successfully started service 'sparkDriver' on port 54155.
17/05/24 00:45:47 INFO SparkEnv: Registering MapOutputTracker
17/05/24 00:45:47 INFO SparkEnv: Registering BlockManagerMaster
17/05/24 00:45:47 INFO DiskBlockManager: Created local directory at /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/blockmgr-a4bf01fe-0323-4361-b8be-614472214249
17/05/24 00:45:47 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
17/05/24 00:45:47 INFO SparkEnv: Registering OutputCommitCoordinator
17/05/24 00:45:47 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/05/24 00:45:47 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://***.***.***.***:4040
17/05/24 00:45:47 INFO Executor: Starting executor ID driver on host localhost
17/05/24 00:45:47 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54156.
17/05/24 00:45:47 INFO NettyBlockTransferService: Server created on ***.***.***.***:54156
17/05/24 00:45:47 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ***.***.***.***, 54156)
17/05/24 00:45:47 INFO BlockManagerMasterEndpoint: Registering block manager ***.***.***.***:54156 with 2004.6 MB RAM, BlockManagerId(driver, ***.***.***.***, 54156)
17/05/24 00:45:47 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ***.***.***.***, 54156)
17/05/24 00:45:47 INFO SparkContext: Starting job: collect at RDDEx.java:25
17/05/24 00:45:47 INFO DAGScheduler: Got job 0 (collect at RDDEx.java:25) with 1 output partitions
17/05/24 00:45:47 INFO DAGScheduler: Final stage: ResultStage 0 (collect at RDDEx.java:25)
17/05/24 00:45:47 INFO DAGScheduler: Parents of final stage: List()
17/05/24 00:45:47 INFO DAGScheduler: Missing parents: List()
17/05/24 00:45:47 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at RDDEx.java:23), which has no missing parents
17/05/24 00:45:47 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 2004.6 MB)
17/05/24 00:45:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1675.0 B, free 2004.6 MB)
17/05/24 00:45:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ***.***.***.***:54156 (size: 1675.0 B, free: 2004.6 MB)
17/05/24 00:45:47 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012
/******** 다른 부분1 ********/
17/05/24 00:45:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at RDDEx.java:23) 
17/05/24 00:45:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/05/24 00:45:48 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5412 bytes)
17/05/24 00:45:48 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/05/24 00:45:48 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 816 bytes result sent to driver
17/05/24 00:45:48 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 61 ms on localhost (1/1)
17/05/24 00:45:48 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
/******** 다른 부분2 ********/
17/05/24 00:45:48 INFO DAGScheduler: ResultStage 0 (collect at RDDEx.java:25) finished in 0.074 s
17/05/24 00:45:48 INFO DAGScheduler: Job 0 finished: collect at RDDEx.java:25, took 0.220065 s 
test1Transformate
test2Transformate
test3Transformate
17/05/24 00:45:48 INFO SparkContext: Invoking stop() from shutdown hook
17/05/24 00:45:48 INFO SparkUI: Stopped Spark web UI at http://***.***.***.***:4040
17/05/24 00:45:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/05/24 00:45:48 INFO MemoryStore: MemoryStore cleared
17/05/24 00:45:48 INFO BlockManager: BlockManager stopped
17/05/24 00:45:48 INFO BlockManagerMaster: BlockManagerMaster stopped
17/05/24 00:45:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/05/24 00:45:48 INFO SparkContext: Successfully stopped SparkContext
17/05/24 00:45:48 INFO ShutdownHookManager: Shutdown hook called
17/05/24 00:45:48 INFO ShutdownHookManager: Deleting directory /private/var/folders/x3/w_wh6rfn7bdf8thb_g0yxvb80000gn/T/spark-980aa1a2-70af-4e57-8c42-d85e65c34a93

다음과 같이 나타난다.

로그에/******** 다른 부분 ********/을 보게 되면 액션 동작시에는 트랜스포메이션이 동작한 것을 확인 할 수 있다.


결론 

이번 포스팅에서는 간단하게 Spark의 RDD에 대해서 알아 보았다. Spark에서 RDD를 생성하는 방법에는 파일을 읽는 방법, 데이터베이스를 읽는 방법등 여러가지 방법을 제공 해준다. 핵심은 RDD에 적용되는 연산인 트랜스포메이션과 액션이 어떻게 동작되는지에 대해서 확인 할 수 있어야 한다.

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함