아파치 하둡 얀은 리소스 관리와 컴포넌트 처리를 분리한 하둡 2.0에 도입된 아파치 소프트웨어 재단의 서브 프로젝트입니다얀은 맵-리듀스의 차세대 기술로서 -리듀스의 확장성과 속도문제를 해소하기 위해 새로 개발된 프로젝트입니다얀 이전의 맵-리듀스에서는 4000노드 이상의 클러스터에서 시스템 확장에 문제가 발생하여 야후(Yahoo)에서 새로 설계하였습니다.

 

가장 큰 변화로는 얀 자체로 맵-리듀스를 구동할 수 있으며추가로 다른 분산 처리 프레임워크(Tez, HBase, Giraph, Spark )를 사용자의 인터페이스 개발만으로 구동이 가능하게 되었습니다.

 

 

하둡 2.0 스택 ]

 

(YARN)은 하둡 분산 파일 시스템(HDFS, Hadoop Distributed File System)의 상단에서 빅 데이터용 애플리케이션들을 실행하는 대용량 분산 운영체제 역할을 수행합니다(YARN)을 이용하면 안정적인 기반에서 배치 작업과 양방향 실시간 작업을 수행할 수 있습니다아파치 재단은 얀(YARN)을 -리듀스 버전 2(MRv2)’로 명명했습니다.

 

 

YARN의 구조

(YARN)은 크게 리소스 매니저(Resource Manager), 노드 매니저(Node Manager), 애플리케이션 마스터(Application Master), 컨테이너(Container)로 구성되어 있습니다.

 

① 리소스 매니저(Resource Manager)

클러스터 전체를 관리하는 마스터 서버의 역할을 담당하며응용 프로그램의 요청을 처리합니다리소스 매니저는 클러스터에서 발생한 작업을 관리하는 애플리케이션 매니저(Applications Manager)를 내장하고 있으며응용 프로그램들 간의 자원(resource) 사용에 대한 경쟁을 조율합니다여기서 말하는 자원이란 CPU, 디스크(disk), 메모리(memory)등을 의미합니다.

 

 노드 매니저(Node Manager)

노드당 하나씩 존재하며슬레이브 노드(slave node)의 자원을 모니터링(monitoring) 하고 관리하는 역할을 수행합니다노드 매니저는 리소스 매니저의 지시를 받아 작업 요구사항에 따라서 컨테이너를 생성합니다.

 

 애플리케이션 마스터(Application Master)

노드 매니저와 함께 번들로 제공되며작업당 하나씩 생성이 되며컨테이너를 사용하여 작업 모니터링과 실행을 관리합니다또한리소스 매니저와 작업에 대한 자원 요구사항을 협상하고작업을 완료하기 위한 책임을 가집니다.

 

 컨테이너(Container)

CPU, 디스크(Disk), 메모리(Memory) 등과 같은 속성으로 정의됩니다이 속성은 그래프 처리(Graph processing) MPI(Message Passing Interface: 분산 및 병렬 처리에서 정보의 교환에 대해 기술하는 표준)와 같은 여러 응용 프로그램을 지원하는데 도움이 됩니다모든 작업(job)은 결국 여러 개의 작업(task)으로 세분화되며각 작업(task)은 하나의 컨테이너 안에서 실행이 됩니다필요한 자원의 요청은 애플리케이션 마스터(Application Master)가 담당하며승인 여부는 리소스 매니저(Resource Manager)가 담당합니다컨테이너 안에서 실행할 수 있는 프로그램은 자바 프로그램뿐만 아니라커맨드 라인에서 실행할 수 있는 프로그램이면 모두 가능합니다.

 

 

[ YARN의 전체 구성도 ]

 

 

리소스 매니저(Resource Manager)의 구조

리소스 매니저는 클러스터마다 존재하며클러스터 전반의 자원 관리와 작업(task)들의 스케쥴링을 담당합니다리소스 매니저는 클라이언트로부터 애플리케이션 실행 요청을 받으면 그 애플리케이션의 실행을 책임질 애플리케이션 마스터(Application Master)를 실행합니다또한 클러스터 내에 설치된 모든 노드 매니저(Node Manager)와 통신을 통해서 각 서버마다 할당된 자원과 사용중인 자원의 상황을 알 수 있으며애플리케이션 마스터들과의 통신을 통해 필요한 자원이 무엇인지 알아내어 관리하게 됩니다.

 

리소스 매니저(Resource Manager) 내부에는 여러 개의 컴포넌트들이 존재하며스케쥴러(Scheduler), 애플리케이션 매니저(Application Manager), 리소스 트랙커(Resource Tracker) 세개의 메인 컴포넌트가 있습니다.

 

① 스케쥴러(Scheduler)

노드 매니저(Node Manager)들의 자원 상태를 관리하며 부족한 리소스들을 배정합니다스케쥴러는 프로그램의 상태를 검사하거나 모니터링 하지 않으며순수하게 스케쥴링 작업만 담당합니다스케쥴링이란 자원 상태에 따라서 작업(task)들의 실행 여부를 허가해 주는 역할만 담당하며그 이상의 책임은 지지 않습니다프로그램 오류나 하드웨어의 오류로 문제가 발생한 프로그램을 재 시작시켜주지 않으며프로그램에서 요구하는 리소스(CPU, Disk, 네트워크등)에 관련된 기능만 처리합니다.

 

 애플리케이션 매니저(Application Manager)

노드 매니저(Node Manager) 에서 특정 작업을 위해서 애플리케이션 마스터(Application Master)를 실행하고애플리케이션 마스터(Application Master)의 상태를 관리합니다여기서 애플리케이션 마스터(Application Master)라는 용어가 나오는데 얀에서 실행되는 하나의 작업(task)을 관리하는 마스터 서버를 말합니다.

 

③ 리소스 트랙커(Resource Tracker)

컨테이너(Container)가 아직 살아 있는지 확인하기 위해서, 애플리케이션 마스터(Applications Master) 재 시도 최대 횟수그리고 노드 매니저(Node Manager)가 죽은 것으로 간주 될 때까지 얼마나 기다려야 하는지 등과 같은 설정 정보를 가지고 있습니다.

 

리소스 매니저서브 컴포넌트 ]

 

 

노드 매니저(Node Manager)의 구조

노드 매니저는 노드(컴퓨터당 한 개씩 존재합니다해당 컨테이너(Application Container)의 리소스 사용량을 모니터링 하고관련 정보를 리소스 매니저에게 알리는 역할을 담당합니다애플리케이션 마스터(Application Master)와 애플리케이션 컨테이너(Application Container)로 구성되어 있습니다.

 

 애플리케이션 마스터(Application Master)

하나의 프로그램에 대한 마스터 역할을 수행하며, 스케쥴러로부터 적절한 애플리케이션 컨테이너(Application Container)를 할당 받고프로그램 실행 상태를 모니터링하고 관리합니다.

 

 애플리케이션 컨테이너(Application Container)

프로그램에 할당된 자원을 나타냅니다.

 

 

얀의 작업 순서

다음 그림은 얀 클러스터(YARN Cluster)에서 응용 프로그램이 실행되는 과정을 보여줍니다.


 

[ 얀의 작업 순서 ]


① 클라이언트는Application Master 자체를 실행하는 필요한 데이터를 포함하는 응용프로그램을 제출한다.


② Resource Manager는 Container 할당을 책임지는 Application Master을 시작합니다.


③ Application Master가 Resource Manager에 등록되고 클라이언트가 Resource Manager과 통신할 수 있게 된다.


④ Application Master는 resource-request프로토콜을 통해 Resource Manager에게 적절한 리소스의 Container 를 요청한다.


⑤ Container 가 성공적으로 할당되면, Application Master는 Container 실행 스펙을 Node Manager에게 제공하여  Container 를 실행시킨다. 실행 스펙은 Container가 Application Master와 통신하기 위해 필요한 정보를 포함하고 있다.


⑥ 응용프로그램 코드는 Container 에서 실행되고 진행률, 상태 등의 정보를 응용프로그램-스펙 프로토콜을 통해 Application Master 에게 제공한다.


⑦ 클라이언트는 응용프로그램 실행 중의 상태, 진행률 등을 얻기 위해 응용프로그램-스팩 프로토콜을 통해 Application Master와 직접 통신한다.


⑧ 응용프로그램이 완료되고 모든 필요한 작업이 종료되면, Application Master는 Resource Manager의 등록을 해제하고 자신의 컨테이너를 다른 용도로 사용할 수 있도록 종료한다.

 

출처 : http://ryufree.tistory.com/m/229?category=252660

http://skccblog.tistory.com/1884

'Hadoop ecosystem > YARN' 카테고리의 다른 글

YARN과 MapReduce  (0) 2018.04.11
노드 매니저 구성 요소  (0) 2018.04.10
리소스 매니저 구성요소  (0) 2018.04.10
YARN 개념  (0) 2018.04.10

Hadoop이 버전1에서 버전2로 올라갔지만, 두 버전 모두 MapReduce 엔진을 주요 모듈로 가지고 있다.

MapReduce도 HDFS와 비슷한 방식으로 Job을 제출하고, Job의 진행 상황을 추적하는 로직을 가지고 있다.

네임노드가 있는 서버에는 잡 트래커(Job Tracker)가 있고,

데이터노드가 있는 장비에는 태스크 트래커(Task Tracker)가 배치된다.

비슷하게 잡 트래커는 여러 태스크 트래커에 효율적으로 임무를 할당하고, 결과를 수신하여 병합하는 역할을 한다.


하지만 MapReduce 엔진은 Hadoop 0.23버전에서 대폭 변경이 된다.

그래서 MapReduce 2.0 혹은 MRV2라고 불렀었는데,

나중에 YARN(Yet Another Resource Negotiator)이라는 이름이 지어졌다. 

(yarn은 '실'이라는 뜻이 있어서, YARN을 표현할 때 털실뭉치를 많이 사용한다)

YARN은 기존의 Job Tracker를 리소스 매니저(Resource Manager)어플리케이션 마스터(Application Master)로 분리하였다.


리소스 매니저들은 클러스터 내의 컴퓨팅 리소스들을 어플리케이션에 할당해 주는 역할을 한다.

어플리케이션 마스터는 컴퓨팅 리소스를 할당받아 어떤 어플리케이션을 실행하고, 라이프 사이클을 관리한다. 


이 어플리케이션은 전통적인 MapReduce 뿐 아니라, 다른 형태의 것도 가능해서 훨씬 더 융통성이 생겼다.

정리하면 리소스 매니저는 장비마다 있는 노드 매니저(Node Manager)를 통해 리소스를 관리하고, 어플리케이션 마스터는 어플리케이션 마다 할당되어 생성되며, 리소스 매니저와의 협상을 통해 실제 컴퓨팅 리소스를 사용한다.




이러한 변화는 아래의 Hadoop 스택을 통해서도 볼 수 있다. 즉 이제 MapReduce는 유일한 프로세싱 방법이 아니라, DAG(Directed Acyclic Graph, 방향성 비순환 그래프)로 일반화된 프로세싱 방법 위에서 돌아가는 하나의 어플리케이션일 뿐이다.



결론적으로 YARN은 Hadoop의 클러스터 컴퓨팅 파워를 더 향상시켰다.


확장성 : 매우 큰 동적인 클러스터를 효율적으로 관리할 수 있다


호환성 : YARN은 기존의 MapReduce 어플리케이션과의 하위 호환성을 제공한다. 그래서 Hadoop 1.0에서 작성된             프로그램은 Hadoop 2.0에서도 동작한다.


효율성 : 클러스터의 각 장비의 컴퓨팅 능력을 최대로 활용하여, 컴퓨팅 용량을 증가시켰다.


다양한 워크플로우 지원 : MapReduce 뿐 아니라 그래프 프로세싱, 반복(Iterative) 모델링, 머신러닝 등의

다른 워크 플로우도 지원한다.



'Hadoop ecosystem > YARN' 카테고리의 다른 글

YARN과 MapReduce  (0) 2018.04.11
노드 매니저 구성 요소  (0) 2018.04.10
리소스 매니저 구성요소  (0) 2018.04.10
YARN 구조  (0) 2018.04.10

Hadoop의 기본 모듈과 생태계 

Hadoop은 4개의 기본 모듈로 구성된다. Hadoop Common, HDFS, MapReduce, YARN이 그것들이다.


Hadoop Common : 다른 모듈이 사용할 수 있는 라이브러리와 유틸리티 모음
HDFS : 데이터를 분산하고 안전하게 저장하는 파일 시스템
YARN : 리소스 관리 플랫폼으로 클러스터의 컴퓨팅 리소스를 관리하고 스케쥴링
MapReduce : 큰 데이터 집합에 대해 분산 컴퓨팅을 하는 모듈. (Map(), Reduce()로 나뉨)


HBase, Pig, Hive 등의 모듈들도 다 이 네개의 기본 모듈 위에서 동작하는 것들이다.






출처 : http://ddmix.blogspot.kr/2015/11/hadoop-1-stack-overview.html


빅데이터를 접하기 시작하면서 자주듣게 되는 용어가 있습니다. 맵/리듀스 라는 용어인데요, MR이라고도 많이 쓰구요, 빅데이터 처리에는 늘 맵리듀스 개념이 들어가죠.

그럼, 빅데이터 처리의 기본이되는 맵리듀스란 무엇인지 자세히 알아볼께요.

일단 맵(Map) 이라는 것은 지도? 아니구요, :) 데이터를 담아두는 자료 구조 중의 하나입니다.
맵은 키와 밸류라는 두개의 값을 쌍으로 가지고 있는 형태입니다.
수학시간에 좌표를 표시할때 순서쌍이라고 하죠, (x,y) 이렇게 하던 바로 그 개념입니다.
여기서 x가 키이고, y가 밸류 즉 값인거죠.
그리고 함수 f(x) => y 도  생각나시죠? x를 알면 y를 알 수 있는 구조로 관리 됩니다.   



리듀스(Reduce)는 이 맵을 정리해 나가는(줄여나가는) 방법이라고 할 수 있습니다.
키를 기준으로 (같은 키 값을 가진 맵들의) 개수를 센다든지, 같은 키를 기준으로 밸류를 모두 더하거나,
평균을 내거나 하는 것들이 있습니다.
글 마지막 부분에 더 많은 예제가 있으니까 끝까지 꼭 읽어보세요. :)

그럼... 맵리듀스의 가장 쉬운 예제를 살펴 볼께요.

말로만은 이해가 어려우니 예를 들어볼께요.
맵리듀스 예제로 가장많이 등장하는 wordcount를 가지고 설명할께요.
wordcount는 어떤 글에 등장하는 단어들의 개수를 세는 작업이구요,
java 프로그래밍을 처음 배울때 "hello world"를 찍어보듯 맵리듀스를 처음 배울때 짜보는 프로그램 입니다.

단어 수를 세기 위한 맵은 일단 대상이 되는 글이 담긴 파일을 한 줄 한 줄 읽어가면서 단어를 잘라 만듭니다.
그리고 밸류는 무조건 1로 해서 차곡차곡 나열만 하는거죠.

그리고 나면 하둡 같은 빅데이터 처리 프레임워크에서 suffling이라는 작업을 통해서 비슷한것끼리 모아가면서 정리를 해줍니다.

이어서 리듀스 프로그램이 돌아가게 되는데요. 이때 키가 같은 경우(단어가 같은)의 밸류(1로 입력한)를 더하라고 프로그래밍 하면 단어별로 출현 빈도가 집계되는 것입니다.
 


그런데,,, 도대체 왜 빅데이터에서 맵리듀스의 개념을 사용하는 걸까요?

빅데이터는 양이 워낙 많기 때문에 처리 프로세스는 최대한 단순하게 만들어야 합니다.
그래야 수많은 서버에 흩어서 병렬로 처리할 수 있거든요.
처리의 순서가 중요하다든지 실패했을때 다시하려면 몇단계 전으로 돌아가야 한 다든지 하면 대략난감해지니까요.
이를 위해서는 기준이 되는 값은 하나!이어야 프로세스가 단순합니다.
그래서 기준이되는 값인 키가 하나인 맵 구조를 선택한거구요.
키에 대한 실제 처리는 밸류(값)를 가지고 하게 되는데 여기에 수행 하는 연산도 역시 단순해야 합니다.
어떤식이냐면 교환법칙과 결합법칙이 성립해야 하죠.
임의의 서버에서 임의의 순서로 두 개의 맵을 선택해서 연산을 수행하더라도
최종결과는 늘 같아야 병렬처리가 가능 하니까요.

 
이런 조건을 만족하기 위해서 모든 사용자들의 처리를 하나의 틀 안에서 단순화 하는거죠.

맵 만들고, 리듀스 하고... 다시 맵 만들고 리듀스하고... 하는 식으로요.

이 틀을 어기지만 않는다면 수백대의 서버가 일을 나누어 해도 오류없이 처리가 가능하게 됩니다.
그리고 또 신기한것이 이런 틀 안에서도 맵리듀스를 잘 이용하거나, 여러단계를 반복 시키면, 참 다양하고 꽤 복잡한 작업도 처리가 가능하게 되니, 충분히 분할만 해나간다면 본질적으로 복잡한 작업이란 없는건지도 모르겠습니다. :)

개념을 이해하기 위해서, 맵리듀스를 사용하는 패턴을 조금 더 자세하게 살펴볼께요.

예를 들어 쇼핑몰 사용자의 사용 로그가 아래와 같이 남는다고 가정 해볼께요.


사용일자:사용자아이디:행동유형:관련금액
-----------------------------------------
20150305 0930:chulsoo:addToCart:0
20150305 1002:chulsoo:buy:33000
...

쇼핑몰을 방문한 사용자를 집계하고 싶을때는
로그를 한 줄씩 읽으면서 (사용자아이디, 1) 형태로 맵을 만듭니다.
리듀스 할때 밸류를 서로 더해 주셔도 되고,

key의 개수를 세어주는 함수가 제공된다면 그걸 그냥 호출하면 됩니다.

대략 적으로 코드 형태를 보여드리자면 이런 식입니다.

일종의 pseudo code랄 수 있겠네요. :)


map(사용자아이디, 1)
reduceByKey((a,b) => a+b)


맵을 만들때는 로그의 한 줄을 읽어서 적당한 부분을 잘라내는 파싱 작업이 먼저 이루어져야 하죠. 그래야 map에 저렇게 사용자 아이디를 넣어줄 수 있으니까요.
그리고 reduceByKey에 나오는 a,b는 두 개의 맵을 들고 연산을 수행할때, 각각의 밸류값 입니다.

키는 reduceByKey라는 이름이 알려주듯 같은 키 값을 가진 맵들끼리 계속해서 연산을 해서 맵의 숫자를 줄이는 거죠.

최종적으로는 그 키를 가지는 맵이 한 개 남을때 까지요.

그러니까 키는 정해줄 필요가 없고 첫번째 맵의 밸류 a와 두번째 맵의 밸류 b를 가지고

어떤 연산을 수행할 것인지 (위 예제에서는 더하기(+) 네요) 만 정해 주면 됩니다.

•    특정행동이 일어난 건수를 행동 유형별로 알고 싶을때는
로그를 한 줄씩 읽으면서 (행동유형, 1) 형태로 맵을 만들구요,
리듀스처리에서는 키가 같은 두개의 맵이 만날때마다 밸류 값을 서로 더해주는 함수를 선언하면 됩니다.
map(행동유형, 1)
reduceByKey((a,b) => a+b)

•    당일에 특정행동에 관련된 금액의 총계를 뽑고 싶을때는
사용일시가 당일인 건을 한 줄씩 읽으면서 (행동유형, 관련금액) 형태로 맵을 만들고,
리듀스 처리에서는 키가 같은 두 개의 맵에 만날때마나 밸류값을 서로 더해주는 함수를 선언하면 됩니다.
map(행동유형, 관련금액)
reduceByKey((a,b) => a+b)

•    사용자별 행동별 건수와 같이 두가지 이상의 조건을 조합하고 싶은 경우에는요,
그 조건을 적절하게 이어붙여서 키 값을 만드세요.
한 줄씩 읽으면서 (사용자아이디_행동유형, 1) 형태로 맵을 만드는 것 처럼요.
그리고 나서 리듀스 처리를 하고,
이후에 이걸 다시 파싱해서 DB에 넣어서 조회하거나 하는 방식으로 사용할 수 있습니다.
map (사용자아이디_행동유형, 1)
reduceByKey((a,b) => a+b)

패턴이 보이시나요? :)
로그에서 원하는 조건 부분을 잘라서 맵에 넣으시구요, 밸류에는 개수를 셀때는 1을 원하는 값이 있을때는 그 값을 넣어줍니다. 그리고 나서 키를 기준으로 리듀스를 하시면 됩니다.


출처: http://cskstory.tistory.com/entry/맵리듀스-MapReduce-이해하기 [아는 만큼]



'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04
MapReduce 2  (0) 2017.05.03
Map Reduce 1  (0) 2017.05.02

Actions

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

count()

Return the number of elements in the dataset.

first()

Return the first element of the dataset (similar to take(1)).

take(n)

Return an array with the first n elements of the dataset.

takeSample(withReplacementnum, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

takeOrdered(n[ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

saveAsSequenceFile(path
(Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

saveAsObjectFile(path
(Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.



출처 : https://spark.apache.org/docs/latest/programming-guide.html#Actions

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

Transformations

Transformation

Meaning

map(func)

func를 통해서 새로은 데이터 셋을 만든다.

filter(func)

func의 결과와 일치하면 true/ 그렇지 않으면 false를 반환하여 새로운 데이터셋을 형성할 때 true값만 모아서 만든다.

flatMap(func)

map과 비슷하지만 데이터셋이 list들로 반환된다.

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

sample(withReplacementfractionseed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command[envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.


출처 : https://spark.apache.org/docs/latest/programming-guide.html#transformations

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

Map과 FlatMap은 문자열과 같은 것들을 분리하는 역할을 한다. 각 차이점에 대해서 알아보자.


Map Function

split 하면 list of list로 반환한다. 그래서 접근하기가 까다롭다.




flatMap

split시에 list로 반환 하기때문에 접근하기 쉽다.


text = sc.textFile("short")

wc = text.flatMap(lambda line : line.split()).map(lambda word : (word.1)).reduceByKey(lamdba c1, c2 : c1+c2)

wc.collect()



'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
ReduceByKey vs. GroupByKey  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

ReduceByKey

Key값을 가져야한다.

같은 node의 같은 key값 기준으로 values를 미리 병합하여 suffling한다.

reduceByKey에 function을 같이 넘겨 줄수 있다.

accociate / commutative (+, x만 사용) 


countByValue : return값이 dictionary


Picture

GroupByKey 

각 노드에서 병합하지않고 shuflling한후에 병합하기 때문에 네트워크부하가 많이 걸려 효율성이 떨어진다.(그래서 추천X)

groupbyKey 후에 function을 적용해서 값을 구한다.

K는 그대론데V는 iterableObject로 결과가 나온다.

Picture


그림출처 : http://www.ruxizhang.com/blog/spark-difference-between-reducebykey-groupbykey

'Hadoop ecosystem > Spark' 카테고리의 다른 글

Actions function  (0) 2017.05.11
Transformations function  (0) 2017.05.11
Map vs. flatMap  (0) 2017.05.11
RDD(Resilient Distributed DataSet)  (0) 2017.05.10

+ Recent posts