RDD 기초
스파크의 RDD는 분산되어 있는 변경 불가능한 객체 모음이다.
각 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러 개의 파티션으로 나뉜다.
RDD는 외부 데이터세트를 로드하거나 드라이버 프로그램에서 객체 컬랙션을 분산시키는 두 가지 방법 중 하나로 만들 수 있다.
1 |
|
한 번 만들어진 RDD는 두 가지 타입의 연산을 지원한다.
트렌스포메이션
존재하는 RDD에서 새로운 RDD를 만들어 낸다.
1 |
|
액션
RDD를 기초로 결과 값을 계산한다.
그 값을 드라이버 프로그램에 되돌려 주거나 외부 스토리지에 저장하기도 한다.
1 |
|
스파크는 RDD를 여유로운 방식(lazy evaluation)으로 처음 액션을 사용하는 시점에 처리한다. (자바의 stream 처럼)
액션이 실행될 때마다 매번 새로운 연산을 한다. 만약 여러 액션에서 RDD 하나를 재사용하고 싶으면 스파크에 RDD.persist()
를 사용하여 결과를 유지하도록 요청할 수 있다.
RDD 생성
스파크의 RDD는 두 가지 방법으로 생성할 수 있다.
SparkContext.parallelize()
이 방식은 셸에서 바로 자신만의 RDD를 만들 수 있고 연산을 수행할 수 있다. 그러나 이 방식은 하나의 머신 메모리에 모든 데이터세트를 담고 있으므로 널리 쓰지 않는다.
1 |
|
외부 스토리지 로드
외부 스토리지에서 데이터를 불러올 수 있으며 가장 맣ㄴ이 사용하는 것은 SparkContext.textFile()
이다.
1 |
|
많이 쓰이는 트랜스포메이션과 액션
트랜스포메이션
{1, 2, 3, 3}
을 가지고 있는 RDD에 대한 RDD 트랜스포메이션
함수 이름 | 용도 | 예 | 결과 |
---|---|---|---|
map() |
RDD의 각 요소에 함수를 적용하고 결과 RDD를 되돌려준다. | rdd.map(x => x+1) |
{2, 3, 4, 4} |
flatMap() |
RDD의 각 요소에 함수를 적용하고 반환된 반복자의 내용들로 이루어진 RDD를 되돌려 준다. 종종 단어 분해를 위해 쓰인다. | rdd.flatMap(x => x.to(3)) |
{1, 2, 3, 2, 3, 3, 3} |
filter() |
filter() 로 전달된 함수의 조건을 통과한 값으로만 이루어진 RDD를 되돌려준다. |
rdd.filter(x => x != 1) |
{2, 3, 3} |
distinct() |
중복 제거 | rdd.distinct() |
{1, 2, 3} |
sample(wtihReplacement, fraction, [seed] |
복원 추출(withReplacement=true)이나 비복원 추출로 RDD에서 표본을 뽑아낸다. | rdd.sample(false, 0.5) |
생략 |
{1, 2, 3}
과 {3, 4, 5}
를 가진 두 RDD에 대한 트렌스포메이션
함수 이름 | 용도 | 예 | 결과 |
---|---|---|---|
union() |
두 RDD에 있는 데이터들을 합한 RDD를 생성한다. | rdd.union(other) |
{1, 2, 3, 3, 4, 5} |
intersection() |
양쪽 RDD에 모두 있는 데이터들만을 가진 RDD를 반환한다. | rdd.intersection(other) |
{3} |
subtract() |
한 RDD가 가진 데이터를 다른 쪽에서 삭제한다. | rdd.subtract(other) |
{1, 2} |
cartesian() |
두 RDD 데이터의 카테시안 곱 | rdd.cartesian(other) |
{(1, 3), (1, 4) .... (3, 5)} |
액션
{1, 2, 3 ,3}
을 가지고 있는 RDD에 대한 액션
함수 이름 | 용도 | 예 | 결과 |
---|---|---|---|
collect() |
RDD의 모든 데이터 요소 리턴 | rdd.collect() |
{1, 2, 3, 3} |
count() |
RDD의 요소 개수 리턴 | rdd.count() |
4 |
countByValue() |
RDD에 있는 각 값의 개수 리턴 | rdd.countByValue() |
{(1, 1), (2, 1), (3, 2)} |
take(num) |
RDD의 값들 중 num개 리턴 | rdd.take(2) |
{1, 2} |
top(num) |
RDD의 값들 중 상위 num개 리턴 | rdd.top(2) |
{3, 3} |
takeOrdered(num)(ordering) |
제공된 ordering 기준으로 num개 리턴 | rdd.takeOrdered(2)(myOrdering) |
{3, 3} |
takeSample(withReplacement, num, [seed]) |
무작위 값들 리턴 | rdd.takeSample(false, 1) |
생략 |
reduce(func) |
RDD의 값들을 병렬로 병합 연산한다. | rdd.rdeuce((x, y) => x + y) |
9 |
fold(zero)(func) |
reduce() 와 동일하지만 제로 벨류를 넣어준다. |
rdd.fold(0)((x, y) => x + y) |
9 |
aggregate(zeroValue)(seqOp, combOp) |
reduce() 와 유사하나 다른 타입을 리턴한다. |
rdd.aggregate((0, 0))((x, y) => (x._1 + y, x_2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) |
(9, 4) |
foreach(func) |
RDD의 각 값에 func를 적용한다. | rdd.foreach(func) |
없음 |