Spark RDD

RDD 기초

스파크의 RDD는 분산되어 있는 변경 불가능한 객체 모음이다.

각 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러 개의 파티션으로 나뉜다.

RDD는 외부 데이터세트를 로드하거나 드라이버 프로그램에서 객체 컬랙션을 분산시키는 두 가지 방법 중 하나로 만들 수 있다.

1
2
3

# 파이썬에서 textFile()로 문자열 RDD만들기
lines = sc.textFile("README.md")

한 번 만들어진 RDD는 두 가지 타입의 연산을 지원한다.

트렌스포메이션

존재하는 RDD에서 새로운 RDD를 만들어 낸다.

1
pythonLines = lines.filter(lambda line: "Python" in line)

액션

RDD를 기초로 결과 값을 계산한다.
그 값을 드라이버 프로그램에 되돌려 주거나 외부 스토리지에 저장하기도 한다.

1
pythonLines.first()

스파크는 RDD를 여유로운 방식(lazy evaluation)으로 처음 액션을 사용하는 시점에 처리한다. (자바의 stream 처럼)

액션이 실행될 때마다 매번 새로운 연산을 한다. 만약 여러 액션에서 RDD 하나를 재사용하고 싶으면 스파크에 RDD.persist()를 사용하여 결과를 유지하도록 요청할 수 있다.

RDD 생성

스파크의 RDD는 두 가지 방법으로 생성할 수 있다.

SparkContext.parallelize()

이 방식은 셸에서 바로 자신만의 RDD를 만들 수 있고 연산을 수행할 수 있다. 그러나 이 방식은 하나의 머신 메모리에 모든 데이터세트를 담고 있으므로 널리 쓰지 않는다.

1
lines = sc.parallelize(["pandas", "i like pandas"])

외부 스토리지 로드

외부 스토리지에서 데이터를 불러올 수 있으며 가장 맣ㄴ이 사용하는 것은 SparkContext.textFile()이다.

1
lines = sc.textFile("/path/to/README.md")

많이 쓰이는 트랜스포메이션과 액션

트랜스포메이션

{1, 2, 3, 3}을 가지고 있는 RDD에 대한 RDD 트랜스포메이션

함수 이름 용도 결과
map() RDD의 각 요소에 함수를 적용하고 결과 RDD를 되돌려준다. rdd.map(x => x+1) {2, 3, 4, 4}
flatMap() RDD의 각 요소에 함수를 적용하고 반환된 반복자의 내용들로 이루어진 RDD를 되돌려 준다. 종종 단어 분해를 위해 쓰인다. rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() filter()로 전달된 함수의 조건을 통과한 값으로만 이루어진 RDD를 되돌려준다. rdd.filter(x => x != 1) {2, 3, 3}
distinct() 중복 제거 rdd.distinct() {1, 2, 3}
sample(wtihReplacement, fraction, [seed] 복원 추출(withReplacement=true)이나 비복원 추출로 RDD에서 표본을 뽑아낸다. rdd.sample(false, 0.5) 생략

{1, 2, 3}{3, 4, 5}를 가진 두 RDD에 대한 트렌스포메이션

함수 이름 용도 결과
union() 두 RDD에 있는 데이터들을 합한 RDD를 생성한다. rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() 양쪽 RDD에 모두 있는 데이터들만을 가진 RDD를 반환한다. rdd.intersection(other) {3}
subtract() 한 RDD가 가진 데이터를 다른 쪽에서 삭제한다. rdd.subtract(other) {1, 2}
cartesian() 두 RDD 데이터의 카테시안 곱 rdd.cartesian(other) {(1, 3), (1, 4) .... (3, 5)}

액션

{1, 2, 3 ,3}을 가지고 있는 RDD에 대한 액션

함수 이름 용도 결과
collect() RDD의 모든 데이터 요소 리턴 rdd.collect() {1, 2, 3, 3}
count() RDD의 요소 개수 리턴 rdd.count() 4
countByValue() RDD에 있는 각 값의 개수 리턴 rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) RDD의 값들 중 num개 리턴 rdd.take(2) {1, 2}
top(num) RDD의 값들 중 상위 num개 리턴 rdd.top(2) {3, 3}
takeOrdered(num)(ordering) 제공된 ordering 기준으로 num개 리턴 rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(withReplacement, num, [seed]) 무작위 값들 리턴 rdd.takeSample(false, 1) 생략
reduce(func) RDD의 값들을 병렬로 병합 연산한다. rdd.rdeuce((x, y) => x + y) 9
fold(zero)(func) reduce()와 동일하지만 제로 벨류를 넣어준다. rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) reduce()와 유사하나 다른 타입을 리턴한다. rdd.aggregate((0, 0))((x, y) => (x._1 + y, x_2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) (9, 4)
foreach(func) RDD의 각 값에 func를 적용한다. rdd.foreach(func) 없음