Actions

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

count()

Return the number of elements in the dataset.

first()

Return the first element of the dataset (similar to take(1)).

take(n)

Return an array with the first n elements of the dataset.

takeSample(withReplacementnum, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

takeOrdered(n[ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

saveAsSequenceFile(path
(Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

saveAsObjectFile(path
(Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.



출처 : https://spark.apache.org/docs/latest/programming-guide.html#Actions

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

Transformations

Transformation

Meaning

map(func)

func를 통해서 새로은 데이터 셋을 만든다.

filter(func)

func의 결과와 일치하면 true/ 그렇지 않으면 false를 반환하여 새로운 데이터셋을 형성할 때 true값만 모아서 만든다.

flatMap(func)

map과 비슷하지만 데이터셋이 list들로 반환된다.

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

sample(withReplacementfractionseed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command[envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.


출처 : https://spark.apache.org/docs/latest/programming-guide.html#transformations

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

Map과 FlatMap은 문자열과 같은 것들을 분리하는 역할을 한다. 각 차이점에 대해서 알아보자.


Map Function

split 하면 list of list로 반환한다. 그래서 접근하기가 까다롭다.




flatMap

split시에 list로 반환 하기때문에 접근하기 쉽다.


text = sc.textFile("short")

wc = text.flatMap(lambda line : line.split()).map(lambda word : (word.1)).reduceByKey(lamdba c1, c2 : c1+c2)

wc.collect()



'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

ReduceByKey

Key값을 가져야한다.

같은 node의 같은 key값 기준으로 values를 미리 병합하여 suffling한다.

reduceByKey에 function을 같이 넘겨 줄수 있다.

accociate / commutative (+, x만 사용) 


countByValue : return값이 dictionary


Picture

GroupByKey 

각 노드에서 병합하지않고 shuflling한후에 병합하기 때문에 네트워크부하가 많이 걸려 효율성이 떨어진다.(그래서 추천X)

groupbyKey 후에 function을 적용해서 값을 구한다.

K는 그대론데V는 iterableObject로 결과가 나온다.

Picture


그림출처 : http://www.ruxizhang.com/blog/spark-difference-between-reducebykey-groupbykey

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

기본 개념 

RDD는 여러 분산 노드에 걸쳐서 저장되는 변경이 불가능한 데이터(객체)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 된다. (서로 다른 노드에서 분리되서 실행되는). 쉽게 말해서 스파크 내에 저장된 데이터를 RDD라고 하고, 변경이 불가능하다. 변경을 하려면 새로운 데이터 셋을 생성해야 한다. RDD의 생성은 외부로 부터 데이터를 로딩하거나 또는 코드에서 생성된 데이터를 저장함으로써 생성할 수 있다.


분산되어 존재하는 데이터 요소들의 모임이다. RDDjavaString처럼 변경이 불가능한 객체(immutable)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 된다. 변경을 하기 위해서는 새로운 RDD를 만들거나, 존재하는 RDD를 변형, 결과 계산을 위해 RDD에서 연산을 호출하여 생성을 해야 한다. Spark는 자동으로 RDD에 있는 데이터들을 클러스터에 분배, 수행하는 연산들을 병렬화 한다. SparkRDDlazy evaluation으로 액션을 사용하는 시점에서 처리하기 때문에, 구현할때는 transformationactionoperation을 확실히 이해하고 있어야 결과를 받아보는데 효율적으로 구현이 가능하다. 예를 들어 한 라인에 action이 포함된 transformation을 할 경우에는 결과를 출력하기 때문에 불필요한 반환 값을 받을 수 있다. action을 만나기 전까지 transformation을 처리하지 않는다. RDD는 데이터를 갖고 있기 보다는 lineage를 가지고 있다.

 

RDD 생성하기

1) 외부로 부터 파일을 읽어서 로딩하거나 파일은 일반 파일을 읽거나 S3,HBase,HDFS,Cassandra 등에서 데이터를 읽어오는 방법.

파이썬 예제) lines = sc.textFile(“/path/filename.txt”)

 

2) 드라이버 프로그램내에서 생성된 collection을 parallelize() 라는 메서드를 이용해서 RDD 화하는 방법(자바 컬렉션등을 RDD로 생성)

자바 예제) JavaRDD<String> lines = sc.parallelize(Array.asList(“first”,”second”))

Lazy Execution

RDD의 데이터 로딩 방식은 Lazy 로딩 컨셉을 사용하는데, 예를 들어 sc.textFile(“파일”)로 파일을 로딩하더라도 실제로 로딩이 되지 않는다. 파일이 로딩되서 메모리에 올라가는 시점은 action을 이용해서 개선할 당시만 올라간다. 결국 lineage(계보, 혈통)만 가지고 있다.

장점 : 효용성이 좋다.

단점 : Actions시에 시간이 많이 걸린다.

그래서 RDDTranformation하다가 적당할때 Action한다.

 

그렇다면, 언제 실제 README.md 파일이 읽혀질까? 실제로 읽혀지는 시기는 README.md 파일을 sc.textFile로 오픈할 때가 아니라 .count() 라는 액션이 수행될 때 이다.

이유는 파일을 오픈할 때부터 RDD를 메모리에 올려놓게 되면 데이터가 클 경우, 전체가 메모리에 올라가야 하는데, 일반적으로 filter 등을 이용해서 데이터를 정재한 후에, action을 수행하기 때문에, action을 수행할 때, action수행 시 필요한 부분만 (filter가 적용된 부분만) 메모리에 올리면 훨씬 작은 부분을 올릴 수 있기 때문에 수행 시에 데이터를 로딩하게 된다.

 

그렇다면 로딩된 데이터는 언제 지워질까?

action을 수행한 다음 바로 지워진다. 만약에, 한번 읽어드린 RDD를 메모리에 상주하고 계속해서 재사용하고 싶다면 RDD.persist()라는 메서드를 이용하면, RDD를 메모리에 상주 시킬 수 있다.


RDD Operations

1) Transformation (변환)

기존의 RDD 데이터를 변경하여 새로운 RDD 데이터를 생성해내는 것. 흔한 케이스는 filter와 같이 특정 데이터만 뽑아내거나 map 함수처럼, 데이터를 분산 배치하는 것 등을 들 수 있다.

변환은 RDD를 필터링하거나 변환하여 새로운 RDD를 리턴하는 오퍼레이션이다.

다음 코드는 README.md 라는 파일을 읽어서 f 라는 RDD를 생성한후

f라는 RDD 에서 “Apache”라는 문자열을 가진 라인만을 모아서 t라는 RDD를 새롭게 생성한 후 화면으로 출력하는 예제이다.

ft는 전혀 다른 RDDRDD tfilter에 의해서 새롭게 생성되었다.

변환 함수는 filter 뿐 아니라, map, group등 여러가지 함수들이 있다.

https://spark.apache.org/docs/latest/programming-guide.html#transformations

 

2) Action (액션)

RDD 값을 기반으로 무엇인가를 계산해서(computation) 결과를 (셋이 아닌) 생성해 내는것으로 가장 쉬운 예로는 count()와 같은 operation들을 들 수 있다.

액션은 RDD를 가지고 계산을 해서 최종 결과를 리턴하거나 또는 데이터를 외부 저장소(External Storage)등에 쓸 수 있다.

최종 결과를 리턴하는 오퍼레이션으로는 앞의 예제에서도 설명한 count(), 첫번째 element를 리턴하는 first등이 있으며, RDD를 저장하는 오퍼레이션으로는 saveAsTextFile(path)와 같은 오퍼레이션 등이 있다.

https://spark.apache.org/docs/latest/programming-guide.html#actions

 

 

참조 : http://bcho.tistory.com/1027 [조대협의 블로그]

http://ourcstory.tistory.com/127 [쌍쌍바나나의 블로그]

본 포스팅을 오라일사의 "Learning Spark" Sparing Programming Guide를 참고하여 작성하였습니다. https://spark.apache.org/docs/latest/programming-guide.html

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11

+ Recent posts