Actions

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

count()

Return the number of elements in the dataset.

first()

Return the first element of the dataset (similar to take(1)).

take(n)

Return an array with the first n elements of the dataset.

takeSample(withReplacementnum, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

takeOrdered(n[ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

saveAsSequenceFile(path
(Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

saveAsObjectFile(path
(Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.



출처 : https://spark.apache.org/docs/latest/programming-guide.html#Actions

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

Transformations

Transformation

Meaning

map(func)

func를 통해서 새로은 데이터 셋을 만든다.

filter(func)

func의 결과와 일치하면 true/ 그렇지 않으면 false를 반환하여 새로운 데이터셋을 형성할 때 true값만 모아서 만든다.

flatMap(func)

map과 비슷하지만 데이터셋이 list들로 반환된다.

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

sample(withReplacementfractionseed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command[envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.


출처 : https://spark.apache.org/docs/latest/programming-guide.html#transformations

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

Map과 FlatMap은 문자열과 같은 것들을 분리하는 역할을 한다. 각 차이점에 대해서 알아보자.


Map Function

split 하면 list of list로 반환한다. 그래서 접근하기가 까다롭다.




flatMap

split시에 list로 반환 하기때문에 접근하기 쉽다.


text = sc.textFile("short")

wc = text.flatMap(lambda line : line.split()).map(lambda word : (word.1)).reduceByKey(lamdba c1, c2 : c1+c2)

wc.collect()



'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

ReduceByKey

Key값을 가져야한다.

같은 node의 같은 key값 기준으로 values를 미리 병합하여 suffling한다.

reduceByKey에 function을 같이 넘겨 줄수 있다.

accociate / commutative (+, x만 사용) 


countByValue : return값이 dictionary


Picture

GroupByKey 

각 노드에서 병합하지않고 shuflling한후에 병합하기 때문에 네트워크부하가 많이 걸려 효율성이 떨어진다.(그래서 추천X)

groupbyKey 후에 function을 적용해서 값을 구한다.

K는 그대론데V는 iterableObject로 결과가 나온다.

Picture


그림출처 : http://www.ruxizhang.com/blog/spark-difference-between-reducebykey-groupbykey

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

기본 개념 

RDD는 여러 분산 노드에 걸쳐서 저장되는 변경이 불가능한 데이터(객체)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 된다. (서로 다른 노드에서 분리되서 실행되는). 쉽게 말해서 스파크 내에 저장된 데이터를 RDD라고 하고, 변경이 불가능하다. 변경을 하려면 새로운 데이터 셋을 생성해야 한다. RDD의 생성은 외부로 부터 데이터를 로딩하거나 또는 코드에서 생성된 데이터를 저장함으로써 생성할 수 있다.


분산되어 존재하는 데이터 요소들의 모임이다. RDDjavaString처럼 변경이 불가능한 객체(immutable)의 집합으로 각각의 RDD는 여러개의 파티션으로 분리가 된다. 변경을 하기 위해서는 새로운 RDD를 만들거나, 존재하는 RDD를 변형, 결과 계산을 위해 RDD에서 연산을 호출하여 생성을 해야 한다. Spark는 자동으로 RDD에 있는 데이터들을 클러스터에 분배, 수행하는 연산들을 병렬화 한다. SparkRDDlazy evaluation으로 액션을 사용하는 시점에서 처리하기 때문에, 구현할때는 transformationactionoperation을 확실히 이해하고 있어야 결과를 받아보는데 효율적으로 구현이 가능하다. 예를 들어 한 라인에 action이 포함된 transformation을 할 경우에는 결과를 출력하기 때문에 불필요한 반환 값을 받을 수 있다. action을 만나기 전까지 transformation을 처리하지 않는다. RDD는 데이터를 갖고 있기 보다는 lineage를 가지고 있다.

 

RDD 생성하기

1) 외부로 부터 파일을 읽어서 로딩하거나 파일은 일반 파일을 읽거나 S3,HBase,HDFS,Cassandra 등에서 데이터를 읽어오는 방법.

파이썬 예제) lines = sc.textFile(“/path/filename.txt”)

 

2) 드라이버 프로그램내에서 생성된 collection을 parallelize() 라는 메서드를 이용해서 RDD 화하는 방법(자바 컬렉션등을 RDD로 생성)

자바 예제) JavaRDD<String> lines = sc.parallelize(Array.asList(“first”,”second”))

Lazy Execution

RDD의 데이터 로딩 방식은 Lazy 로딩 컨셉을 사용하는데, 예를 들어 sc.textFile(“파일”)로 파일을 로딩하더라도 실제로 로딩이 되지 않는다. 파일이 로딩되서 메모리에 올라가는 시점은 action을 이용해서 개선할 당시만 올라간다. 결국 lineage(계보, 혈통)만 가지고 있다.

장점 : 효용성이 좋다.

단점 : Actions시에 시간이 많이 걸린다.

그래서 RDDTranformation하다가 적당할때 Action한다.

 

그렇다면, 언제 실제 README.md 파일이 읽혀질까? 실제로 읽혀지는 시기는 README.md 파일을 sc.textFile로 오픈할 때가 아니라 .count() 라는 액션이 수행될 때 이다.

이유는 파일을 오픈할 때부터 RDD를 메모리에 올려놓게 되면 데이터가 클 경우, 전체가 메모리에 올라가야 하는데, 일반적으로 filter 등을 이용해서 데이터를 정재한 후에, action을 수행하기 때문에, action을 수행할 때, action수행 시 필요한 부분만 (filter가 적용된 부분만) 메모리에 올리면 훨씬 작은 부분을 올릴 수 있기 때문에 수행 시에 데이터를 로딩하게 된다.

 

그렇다면 로딩된 데이터는 언제 지워질까?

action을 수행한 다음 바로 지워진다. 만약에, 한번 읽어드린 RDD를 메모리에 상주하고 계속해서 재사용하고 싶다면 RDD.persist()라는 메서드를 이용하면, RDD를 메모리에 상주 시킬 수 있다.


RDD Operations

1) Transformation (변환)

기존의 RDD 데이터를 변경하여 새로운 RDD 데이터를 생성해내는 것. 흔한 케이스는 filter와 같이 특정 데이터만 뽑아내거나 map 함수처럼, 데이터를 분산 배치하는 것 등을 들 수 있다.

변환은 RDD를 필터링하거나 변환하여 새로운 RDD를 리턴하는 오퍼레이션이다.

다음 코드는 README.md 라는 파일을 읽어서 f 라는 RDD를 생성한후

f라는 RDD 에서 “Apache”라는 문자열을 가진 라인만을 모아서 t라는 RDD를 새롭게 생성한 후 화면으로 출력하는 예제이다.

ft는 전혀 다른 RDDRDD tfilter에 의해서 새롭게 생성되었다.

변환 함수는 filter 뿐 아니라, map, group등 여러가지 함수들이 있다.

https://spark.apache.org/docs/latest/programming-guide.html#transformations

 

2) Action (액션)

RDD 값을 기반으로 무엇인가를 계산해서(computation) 결과를 (셋이 아닌) 생성해 내는것으로 가장 쉬운 예로는 count()와 같은 operation들을 들 수 있다.

액션은 RDD를 가지고 계산을 해서 최종 결과를 리턴하거나 또는 데이터를 외부 저장소(External Storage)등에 쓸 수 있다.

최종 결과를 리턴하는 오퍼레이션으로는 앞의 예제에서도 설명한 count(), 첫번째 element를 리턴하는 first등이 있으며, RDD를 저장하는 오퍼레이션으로는 saveAsTextFile(path)와 같은 오퍼레이션 등이 있다.

https://spark.apache.org/docs/latest/programming-guide.html#actions

 

 

참조 : http://bcho.tistory.com/1027 [조대협의 블로그]

http://ourcstory.tistory.com/127 [쌍쌍바나나의 블로그]

본 포스팅을 오라일사의 "Learning Spark" Sparing Programming Guide를 참고하여 작성하였습니다. https://spark.apache.org/docs/latest/programming-guide.html

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11

람다 형식은 인공지능 분야나 AutoCAD라는 설계 프로그램에서 쓰이는 Lisp 언어에서 물려받았다고 한다.

 

사용법

lambda(인자 : 표현식)

function으로

>>> def hap(x, y):

... return x + y

...

>>> hap(10, 20)

30

 

람다 형식으로

>>> (lambda x, y :  x + y)(10, 20)

30

map(함수, 리스트)

이 함수는 함수와 리스트를 인자로 받는다. 그리고, 리스트로부터 원소를 하나씩 꺼내서 함수를 적용시킨 다음, 그 결과를 새로운 리스트에 담아준다.

>>> list(map(lambda x: x ** 2, range(5)))

[0, 1, 4, 9, 16]

 

reduce(함수, 순서형 자료)

순서형 자료(문자열, 리스트, 튜플)의 원소들을 누적적으로 함수에 적용시킨다.

>>> from functools import reduce

>>> reduce(lambda x, y: x + y, [0, 1, 2, 3, 4])

10

 

filter(함수, 리스트)

리스트에 들어있는 원소들을 함수에 적용시켜서 결과가 참인 값들로 새로운 리스트를 만들어준다.

>>> list(filter(lambda x: x < 5, range(10)))

[0, 1, 2, 3, 4]


'Programming > Python' 카테고리의 다른 글

파일 입출력  (0) 2017.05.08
사용자 입출력  (0) 2017.05.08
함수  (0) 2017.05.08
for 문  (0) 2017.05.08
while 문  (0) 2017.05.08

사람답게 산다는 것은 무엇을 뜻하는 걸까

나는 과연 사람답게 살고 있는것일까.


사람은 자신을 표현하면서 사는 동물이다. 사람답다는 것은 자신을 표현하면서 살아야 한다는 뜻이다. 자신을 표현하면서 사는 것은 어떤 삶일까사람은 일로써 자신을 표현한다고 생각한다. 물론 일 이외에도 취미생활이나, 다른 활동으로써 자신을 표현 할 수 있지만, 삶에서 가장 많은 비중을 차지하는 것이 잠자는 것 다음이 일이기 때문에 그렇다고 할 수 있겠다


 우선 표현의 사전적 정의부터 알아보도록 하자. 표현의 사전적 정의는  "생각이나 느낌 따위를 언어나 몸짓 따위의 형상으로 드러내어 나타냄." 이다. 내안에 내재되어 있는 무언가를 밖으로 표출 시키는것. 그것이 표현이다. 밖으로 표출시키는 것은 어떤 활동으로 나타날 수 있다. 다시 말해서 내가 생각하는 것을 일로써 나타내는것이 표현이라 할 수 있을 것이다.


'Think' 카테고리의 다른 글

엔지니어와 아티스트  (0) 2017.05.13
기계와 인간의 경계  (4) 2017.05.13
기계가 인간은 대신한다.  (0) 2017.04.11
(10/9)말씀  (0) 2016.10.09
소프트웨어 교육의 이유  (0) 2016.10.05

파일 생성하기

f = open("새파일.txt", 'w')

f.close()

파일 객체 = open(파일 이름, 파일 열기 모드)

파일 열기 모드

파일열기모드

설명

r

읽기모드 - 파일을 읽기만 할 때 사용

w

쓰기모드 - 파일에 내용을 쓸 때 사용

a

추가모드 - 파일의 마지막에 새로운 내용을 추가 시킬 때 사용

 

파일을 쓰기 모드로 열게 되면 해당 파일이 이미 존재할 경우 원래 있던 내용이 모두 사라지고, 해당 파일이 존재하지 않으면 새로운 파일이 생성된다.

 

파일을 쓰기 모드로 열어 출력값 적기

 

f = open("C:/Python/새파일.txt", 'w')

for i in range(1, 11):

    data = "%d번째 줄입니다.\n" % i

    f.write(data)

f.close()

위의 프로그램을 다음 프로그램과 비교해 보자.

 

for i in range(1, 11):

    data = "%d번째 줄입니다.\n" % i

print(data)

두 프로그램의 다른 점은 data를 출력하는 방법이다. 두 번째 방법은 우리가 계속 사용해 왔던 모니터 화면에 출력하는 방법이고, 첫 번째 방법은 모니터 화면 대신 파일에 결과값을 적는 방법이다. 두 방법의 차이점은 print 대신 파일 객체 fwrite 함수를 이용한 것이다.

 

 

프로그램의 외부에 저장된 파일을 읽는 여러 가지 방법

readline()

f = open("C:/Python/새파일.txt", 'r')

line = f.readline()

print(line)

f.close()

 

readlines()

f = open("C:/Python/새파일.txt", 'r')

lines = f.readlines()

for line in lines:

    print(line)

f.close()

readlines() 함수는 파일의 모든 라인을 읽어서 각각의 줄을 요소로 갖는 리스트로 리턴한다. 따라서 위의 예에서 lines["1 번째 줄입니다.\n","2 번째 줄입니다.\n",..., "10 번째 줄입니다.\n"]라는 리스트가 된다.

 

read()

f = open("C:/Python/새파일.txt", 'r')

data = f.read()

print(data)

f.close()

 

f.read()는 파일의 내용 전체를 문자열로 리턴

 

파일에 새로운 내용 추가하기

쓰기 모드('w')로 파일을 열 때 이미 존재하는 파일을 열 경우 그 파일의 내용이 모두 사라지게 된다. 하지만 원래 있던 값을 유지하면서 단지 새로운 값만 추가해야 할 경우 경우에는 파일을 추가 모드('a')로 열면 된다.

 

f = open("C:/Python/새파일.txt",'a')

for i in range(11, 20):

    data = "%d번째 줄입니다.\n" % i

    f.write(data)

f.close()

 

추가 모드로 파일을 열었기 때문에 새파일.txt라는 파일이 원래 가지고 있던 내용 바로 다음부터 결과값을 적기 시작한다.

 

with문과 함께 사용하기

with open("foo.txt", "w") as f:

f.write("Life is too short, you need python")

위와 같이 with문을 이용하면 with 블록을 벗어나는 순간 열린 파일 객체 f가 자동으로 close되어 편리하다. (with구문은 파이썬 2.5부터 지원됨)

 

sys 모듈로 입력 인수 주기

import sys

args = sys.argv[1:]

for i in args:

    print(i.upper(), end=' ')

문자열 관련 함수인 upper()를 이용하여 명령 행에 입력된 소문자를 대문자로 바꾸어 주는 간단한 프로그램이다. 도스창에서 다음과 같이 입력해 보자.

 

C:\~>python sys2.py life is too short, you need python

LIFE IS TOO SHORT, YOU NEED PYTHON

 

argv[0] : 프로그램명

argv[1:] : 인수요소


참조 : 점프 투 파이썬 (https://wikidocs.net/26)

'Programming > Python' 카테고리의 다른 글

Lambda함수  (0) 2017.05.10
사용자 입출력  (0) 2017.05.08
함수  (0) 2017.05.08
for 문  (0) 2017.05.08
while 문  (0) 2017.05.08

+ Recent posts