MapReduce연산이 진행되는 DataFlow에 대해 살펴 보겠습니다.




1. Input Files

MapReduce Task를 위한 데이터는 inputFile에 저장되어 있습니다. 그리고 이 input file은 HDFS에 저장 되어있습니다. 이 파일의 포맷은 임시적이며, line-based log files 과 binary format 을 사용할 수 있습니다.


2. InputFormat

InputFormat은 input file이 어떻게 분할되고 어떻게 읽어지는 가를 정의합니다.

Hadoop의 Job은 Map task와 Reduce task로 나누어집니다.


그리고 이 task들은 Yarn에 의해 스케쥴링되고 클러스터안의 노드위에서 실행됩니다. 만약 task가 fail되면 자동으로 다른 노드로 rescheduling합니다.


InputFormat은 입력을 위해서 파일이나 다른 객체를 선택하여 getSplits()를 통하여 List<inputSplit>을 생성합니다. inputSplit를 Application Master에 전달하면 Map Task가 createRecordReader()를 실행시켜 RecordReader를 만듭니다.

Hadoop은 각각의 Split마다 하나의 MapTask를 생성하게 되는데, MapTask는 Split을 각각의 record로 나누어 사용자 정의 map Function을 적용합니다.

2.1. InputSplits

inputSplit은 inputFormat에 의해 생성되며, 데이터를 각각의 Mapper에 맞는 논리 형식으로 분할합니다. 예를들어서 HDFS의 File size는 128MB인데, 파일크기가 150MB라면 Block을 2개 읽어와서 논리적으로 시작과 끝을 지정합니다.


2.2. RecordReader

InputSplit에서 분할된 레코드들을 Mapper에 적합한 Key-Value쌍으로 변환합니다. 기본적으로 TextInputFormat를 사용하여 Key-Value쌍으로 변환합니다. InputFormat은 기본적으로 TextInputFormat를 지원하는데, TextInputFormat은 text file을 읽을때 \n까지를 한줄로 인식하고 한줄 단위로 Split을 만드는 기능을 합니다.

그리고 Record Reader은 InputSplit에서 유니크한 수인 byte offset을 키로하고, 각 라인을 value로 해서 하나의 새로운 Key-Value쌍을 만듭니다. 그리고 이 Key-Value쌍을 Data Processing을 위한 Mapper로 전송하게 됩니다.


3. Mapper

RecordReader를 통해서 입력된 record를 완전히 새로운 Key-Value쌍으로 만드는 프로세스 입니다. Mapper의해 발생된 출력은 HDFS에 바로 저장되지 않고 임시 데이터로 저장이고, 이 출력은 곧 Combiner에 입력으로 들어가게됩니다.

4. Combiner

‘Mini-reducer’라고 알려져 있는 Combiner는 Mapper의 출력을 local에서 Reduce를 처리합니다. local에서 각각에 대하여 reduce연산을 수행하게 되면 이후 진행되는 shuffling이나 Sorting Reducer작업을 위해 데이터를 전송할때 생기는 부하를 줄여주는 효과가 있습니다.

5. Shuffling and Sorting

Reduce에 입력으로 주기위해 Map연산이 끝난 데이터를 Reduce연산에서 생기는 네트워크 트래픽을 최소화 하기 위해서 Sorting하고 같은것으로 모으는 작업입니다.


6. Reducer

Mapper에 의해 생성된 key-Value쌍을 가지고 각각의 reduce연산을 통하여 최종 결과물을 출력합니다. 이 최종결과물은 HDFS에 저장됩니다.

7. RecordWriter

Reduce 연산이 끝난 Key-Value쌍을 출력 파일에 씁니다.

8. OutputFormat

OutputFormat에 의해 결정된 RecordWrite가 출력된 Key-Value쌍을 file에 씁니다. OutputFormat instances은 HDFS또는 그 local disk가 사용하는 하둡에의해 제공됩니다. reducer의 최종 출력은 OutputFormat instances에 의해 HDFS에 저장됩니다.


'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

WordCount  (0) 2018.04.14
Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class WordCount {
    
    public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>//KeyIn, ValueIn, KeyOut, ValueOut 
 
        //IntWritable : Integer형이면서 Writable, Comparable한 자료형
        private final static IntWritable one = new IntWritable(1);
 
        private Text word = new Text();
                        //KeyIn, ValueIn
        
        /*
        On the top of the Crumpetty Tree
        The Quangle Wangle sat,
        But his face you could not see,
        On account of his Beaver Hat.
        (0, On the top of the Crumpetty Tree)
        (33, The Quangle Wangle sat,)
        (57, But his face you could not see,)
        (89, On account of his Beaver Hat.)
        */
        
        public void map(Object key, Text value, Context context)  
                        //{ byte단위 offset, String, Context }
                            throws IOException, InterruptedException {
 
        //value를 " " 단위로 Tokenizing한다.
        StringTokenizer itr = new StringTokenizer(value.toString());
 
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                
                //KeyOut, ValueOut
                context.write(word, one);
            }
 
        }
    }
    public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> { //KeyIn, ValueIn, KeyOut, ValueOut 
 
    private IntWritable result = new IntWritable();
 
                        //KeyIn, ValueIn
    public void reduce(Text key, Iterable<IntWritable> values, Context context
                        ) throws IOException, InterruptedException {
            int sum = 0;
            
            for (IntWritable val : values) {
                sum += val.get();
            }
            
            result.set(sum);
            
            //KeyOut, ValueOut 
            context.write(key, result);
        }
    }
 
    public static void main(String[] args) throws Exception {
        
        //initialize configuration
        Configuration conf = new Configuration();
 
        //create Job Instance
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        
        //Setting Classes using Job Instance
        job.setMapperClass(TokenizerMapper.class);
 
        //같은 노드내에서 reduce작업을 수행함으로써 전송되는 데이터의 갯수를 줄인다.
        job.setCombinerClass(IntSumReducer.class);
 
        //input은 shuffle을 통해 각 노드에서 key별로 (key,[1,2,3,1,1,2])가 된 상태
        job.setReducerClass(IntSumReducer.class);
 
        //job을통해 최종 Output될 클래스 세팅
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //해당 경로를 job의 configuration.mapred.input.dir에 넣어준다.
        FileInputFormat.addInputPath(job, new Path(args[0]));
        
        //Output dir을 설정한다.
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
        //excute the Application
        //Blocking Function(실행이 완료될때 까지 return되지않음)
        //Submit the job, then poll for progress until the job is complete
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        
        //Job이 실행되면 내부적으로 InputSplit과 RecoreReader를 실행해서 
        //Map에 적합한 {Key,Value}쌍을 만든다.
    }
}
 
cs





Sample text-files as input:


$ bin/hadoop fs -ls /user/joe/wordcount/input/

/user/joe/wordcount/input/file01

/user/joe/wordcount/input/file02


$ bin/hadoop fs -cat /user/joe/wordcount/input/file01

Hello World Bye World


$ bin/hadoop fs -cat /user/joe/wordcount/input/file02

Hello Hadoop Goodbye Hadoop


Run the application:

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output


Output:


$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000

Bye 1

Goodbye 1

Hadoop 2

Hello 2

World 2


'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

DataFlow  (0) 2018.04.14
Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04

Indexing은 조회쿼리에 대한 연관문서를 빠르게 찾기위해 사용한다.


Inverted index

각 단어가 존재하는 문서번호를  저장하고 있는 인덱스
Inverted Index
WordDocuments
theDocument 1, Document 3, Document 4, Document 5, Document 7
cowDocument 2, Document 3, Document 4
saysDocument 5
mooDocument 7

검색하고자 하는 단어가 입력되었을때, 해당문서에 단어가 존재하는지 안하는지 만을 알수있다.


The forward index

문서에 존재하는 각 단어들을 저장하고 있는 인덱스
Forward Index
DocumentWords
Document 1the,cow,says,moo
Document 2the,cat,and,the,hat
Document 3the,dish,ran,away,with,the,spoon


'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

DataFlow  (0) 2018.04.14
WordCount  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04

Mapper를 호출하기전에 일반 텍스트파일을 Key Value 값으로 맞춰줘야한다. 그러기 위해서 InputFormat을 맞춰주는 전처리 작업이 필요하다. 이를위해서 MapReduce는 2가지 방법을 제공해주고있다.


TextInputFormat


입력값이 일반텍스트일때, 라인단위로 쪼갠다. Key는 파일의 라인수가되고, Values는 해당라인의 텍스트가 된다


참조 : https://hadoop.apache.org/docs/r2.7.5/api/org/apache/hadoop/mapred/TextInputFormat.html

KeyValueTextInputFormat

입력값이 일반텍스트일때,  파일을 라인단위로 쪼개는데, 특정 구분자를 기준으로 쪼갠다. 만약 해당구분자가 존재하지않는경우 value는 빈값으로 리턴한다. 


참조 : https://hadoop.apache.org/docs/r2.7.5/api/org/apache/hadoop/mapred/KeyValueTextInputFormat.html

'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

WordCount  (0) 2018.04.14
Indexing  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04
MapReduce 2  (0) 2017.05.03

빅데이터를 접하기 시작하면서 자주듣게 되는 용어가 있습니다. 맵/리듀스 라는 용어인데요, MR이라고도 많이 쓰구요, 빅데이터 처리에는 늘 맵리듀스 개념이 들어가죠.

그럼, 빅데이터 처리의 기본이되는 맵리듀스란 무엇인지 자세히 알아볼께요.

일단 맵(Map) 이라는 것은 지도? 아니구요, :) 데이터를 담아두는 자료 구조 중의 하나입니다.
맵은 키와 밸류라는 두개의 값을 쌍으로 가지고 있는 형태입니다.
수학시간에 좌표를 표시할때 순서쌍이라고 하죠, (x,y) 이렇게 하던 바로 그 개념입니다.
여기서 x가 키이고, y가 밸류 즉 값인거죠.
그리고 함수 f(x) => y 도  생각나시죠? x를 알면 y를 알 수 있는 구조로 관리 됩니다.   



리듀스(Reduce)는 이 맵을 정리해 나가는(줄여나가는) 방법이라고 할 수 있습니다.
키를 기준으로 (같은 키 값을 가진 맵들의) 개수를 센다든지, 같은 키를 기준으로 밸류를 모두 더하거나,
평균을 내거나 하는 것들이 있습니다.
글 마지막 부분에 더 많은 예제가 있으니까 끝까지 꼭 읽어보세요. :)

그럼... 맵리듀스의 가장 쉬운 예제를 살펴 볼께요.

말로만은 이해가 어려우니 예를 들어볼께요.
맵리듀스 예제로 가장많이 등장하는 wordcount를 가지고 설명할께요.
wordcount는 어떤 글에 등장하는 단어들의 개수를 세는 작업이구요,
java 프로그래밍을 처음 배울때 "hello world"를 찍어보듯 맵리듀스를 처음 배울때 짜보는 프로그램 입니다.

단어 수를 세기 위한 맵은 일단 대상이 되는 글이 담긴 파일을 한 줄 한 줄 읽어가면서 단어를 잘라 만듭니다.
그리고 밸류는 무조건 1로 해서 차곡차곡 나열만 하는거죠.

그리고 나면 하둡 같은 빅데이터 처리 프레임워크에서 suffling이라는 작업을 통해서 비슷한것끼리 모아가면서 정리를 해줍니다.

이어서 리듀스 프로그램이 돌아가게 되는데요. 이때 키가 같은 경우(단어가 같은)의 밸류(1로 입력한)를 더하라고 프로그래밍 하면 단어별로 출현 빈도가 집계되는 것입니다.
 


그런데,,, 도대체 왜 빅데이터에서 맵리듀스의 개념을 사용하는 걸까요?

빅데이터는 양이 워낙 많기 때문에 처리 프로세스는 최대한 단순하게 만들어야 합니다.
그래야 수많은 서버에 흩어서 병렬로 처리할 수 있거든요.
처리의 순서가 중요하다든지 실패했을때 다시하려면 몇단계 전으로 돌아가야 한 다든지 하면 대략난감해지니까요.
이를 위해서는 기준이 되는 값은 하나!이어야 프로세스가 단순합니다.
그래서 기준이되는 값인 키가 하나인 맵 구조를 선택한거구요.
키에 대한 실제 처리는 밸류(값)를 가지고 하게 되는데 여기에 수행 하는 연산도 역시 단순해야 합니다.
어떤식이냐면 교환법칙과 결합법칙이 성립해야 하죠.
임의의 서버에서 임의의 순서로 두 개의 맵을 선택해서 연산을 수행하더라도
최종결과는 늘 같아야 병렬처리가 가능 하니까요.

 
이런 조건을 만족하기 위해서 모든 사용자들의 처리를 하나의 틀 안에서 단순화 하는거죠.

맵 만들고, 리듀스 하고... 다시 맵 만들고 리듀스하고... 하는 식으로요.

이 틀을 어기지만 않는다면 수백대의 서버가 일을 나누어 해도 오류없이 처리가 가능하게 됩니다.
그리고 또 신기한것이 이런 틀 안에서도 맵리듀스를 잘 이용하거나, 여러단계를 반복 시키면, 참 다양하고 꽤 복잡한 작업도 처리가 가능하게 되니, 충분히 분할만 해나간다면 본질적으로 복잡한 작업이란 없는건지도 모르겠습니다. :)

개념을 이해하기 위해서, 맵리듀스를 사용하는 패턴을 조금 더 자세하게 살펴볼께요.

예를 들어 쇼핑몰 사용자의 사용 로그가 아래와 같이 남는다고 가정 해볼께요.


사용일자:사용자아이디:행동유형:관련금액
-----------------------------------------
20150305 0930:chulsoo:addToCart:0
20150305 1002:chulsoo:buy:33000
...

쇼핑몰을 방문한 사용자를 집계하고 싶을때는
로그를 한 줄씩 읽으면서 (사용자아이디, 1) 형태로 맵을 만듭니다.
리듀스 할때 밸류를 서로 더해 주셔도 되고,

key의 개수를 세어주는 함수가 제공된다면 그걸 그냥 호출하면 됩니다.

대략 적으로 코드 형태를 보여드리자면 이런 식입니다.

일종의 pseudo code랄 수 있겠네요. :)


map(사용자아이디, 1)
reduceByKey((a,b) => a+b)


맵을 만들때는 로그의 한 줄을 읽어서 적당한 부분을 잘라내는 파싱 작업이 먼저 이루어져야 하죠. 그래야 map에 저렇게 사용자 아이디를 넣어줄 수 있으니까요.
그리고 reduceByKey에 나오는 a,b는 두 개의 맵을 들고 연산을 수행할때, 각각의 밸류값 입니다.

키는 reduceByKey라는 이름이 알려주듯 같은 키 값을 가진 맵들끼리 계속해서 연산을 해서 맵의 숫자를 줄이는 거죠.

최종적으로는 그 키를 가지는 맵이 한 개 남을때 까지요.

그러니까 키는 정해줄 필요가 없고 첫번째 맵의 밸류 a와 두번째 맵의 밸류 b를 가지고

어떤 연산을 수행할 것인지 (위 예제에서는 더하기(+) 네요) 만 정해 주면 됩니다.

•    특정행동이 일어난 건수를 행동 유형별로 알고 싶을때는
로그를 한 줄씩 읽으면서 (행동유형, 1) 형태로 맵을 만들구요,
리듀스처리에서는 키가 같은 두개의 맵이 만날때마다 밸류 값을 서로 더해주는 함수를 선언하면 됩니다.
map(행동유형, 1)
reduceByKey((a,b) => a+b)

•    당일에 특정행동에 관련된 금액의 총계를 뽑고 싶을때는
사용일시가 당일인 건을 한 줄씩 읽으면서 (행동유형, 관련금액) 형태로 맵을 만들고,
리듀스 처리에서는 키가 같은 두 개의 맵에 만날때마나 밸류값을 서로 더해주는 함수를 선언하면 됩니다.
map(행동유형, 관련금액)
reduceByKey((a,b) => a+b)

•    사용자별 행동별 건수와 같이 두가지 이상의 조건을 조합하고 싶은 경우에는요,
그 조건을 적절하게 이어붙여서 키 값을 만드세요.
한 줄씩 읽으면서 (사용자아이디_행동유형, 1) 형태로 맵을 만드는 것 처럼요.
그리고 나서 리듀스 처리를 하고,
이후에 이걸 다시 파싱해서 DB에 넣어서 조회하거나 하는 방식으로 사용할 수 있습니다.
map (사용자아이디_행동유형, 1)
reduceByKey((a,b) => a+b)

패턴이 보이시나요? :)
로그에서 원하는 조건 부분을 잘라서 맵에 넣으시구요, 밸류에는 개수를 셀때는 1을 원하는 값이 있을때는 그 값을 넣어줍니다. 그리고 나서 키를 기준으로 리듀스를 하시면 됩니다.


출처: http://cskstory.tistory.com/entry/맵리듀스-MapReduce-이해하기 [아는 만큼]



'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04
MapReduce 2  (0) 2017.05.03
Map Reduce 1  (0) 2017.05.02


대용량의 빅데이터를 분석해야 할 때 single machie 메모리 기반인 프로그램들은  메모리 full 나서 아예 데이터를 load를 못하거나겨우 load를 했어도 분산병렬처리가 안되어서 연산 시간이 매우 오래걸리는 경우로 힘들다.


MapReduce로 분산병렬처리를 할때Hadoop MapReduce가 Java 로 되어있다보니 java나 python을 통하여 driver , mapper, reducer를 프로그래밍하게 된다. 하지만 이는 Data Scientist입장에서는 배우기 힘든 언어이고, 따라서 분석의 비효율을 낳게 된다. 그래서 나온것이 페이스북의 Hive와 야후의 Pig이다. 이는 각각 SQL과 Perl이랑 비슷하여  Data Scientist들에게 쉬운 분석환경을 제공한다. 그 중 Hive는 Structued Data를 분석하는 용으로 쓰인다. Hive를 쓸 줄 알면 대용량 데이터 전처리하는데 아주 요긴하다. SQL을 알고 있으면 배우기도 쉽기에 기존 DW를 Big Data로 porting할 때 Hive를 사용하면 생산성이 올라간다.그리고 local에서와 HDFS에서 DBMS를 옮기기 위해서는 sqoop을 이용해서 옮긴다. 그럼 자세히 알아보도록하자. 



아파치 하이브(Apache Hive)

하둡에서 동작하는 데이터 웨어하우스(Data Warehouse) 인프라 구조로서 데이터 요약질의 및 분석 기능을 제공한다초기에는 페이스북에서 개발되었지만 넷플릭스등과 같은 회사에서 사용되고 있으며 개발되고 있다.

아파치 하이브는 아파치 HDFS이나 아파치 HBase와 같은 데이터 저장 시스템에 저장되어 있는 대용량 데이터 집합들을 분석한다. HiveQL 이라고 불리는 SQL같은 언어를 제공하며 맵리듀스의 모든 기능을 지원한다쿼리를 빠르게 하기위해 비트맵 인덱스를 포함하여 인덱스 기능을 제공한다.

기본적으로 하이브는 메타데이터를 내장된 아파치 더비(Derby) 데이터 베이스 안에 저장한다그렇지만 MySQL과 같은 다른 서버/클라이언트 데이터 베이스를 사용할 수 있는 선택권을 제공한다현재 TEXTFILE, SEQUENCEFILE, ORC 그리고 RCFILE등 4개의 파일 포맷을 지원한다.


피그(Pig)

대용량 데이터 집합을 분석하기 위한 플랫폼으로 아파치 하둡(Apache Hadoop)을 이용하여 맵리듀스(MapReduce)를 사용하기 위한 높은 수준의 스크립트 언어와 이를 위한 인프라로 구성되어 있다.


현재, 피그의 인프라 구조 계층은 컴파일러로 구성되어 있으며 대용량 병렬처리를 위한 맵리듀스 프로그램의 데이터 변환 순서를 만든다. 피그의 언어 계층은 현재 피그 라틴이라 불리는 텍스트 기반의 언어로 이루어져 있다. 이것의 주요 특징은 프그래밍하기가 쉬우며, 최적화 할 수 있는 방법을 제공하고 사용자가 특수 목적을 위한 자신의 함수를 만들 수 있는 확장성을 제공한다는 것이다.


피그는 처음에 야후 연구소에서 2006년경에 매우 커다란 데이터 집합들을 처리하기 위한 맵리듀스 쟙들을 계획하지 않고 필요할 때 생성하고 처리하기 위한 방법을 연구하는 목적으로 개발되었다. 2007년에 아파치 소프트웨어 재단으로 옮겨졌다.


스쿱(Sqoop)

구조화된 관계형 데이터 베이스와 아파치 하둡간의 대용량 데이터들을 효율적으로 변환하여 주는 명령 줄 인터페이스(Command-Line Interface) 애플리케이션이다


 오라클 또는 MySQL같은 관계형 데이터 베이스에서 하둡 분산 파일 시스템으로 데이터들을 가져와서 그 데이터들을 하둡 맵리듀스로 변환을 하고그 변환된 데이터들을 다시 관계형 데이터 베이스로 내보낼 수 있다


 스쿱은 데이터의 가져오기와 내보내기를 맵리듀스를 통해 처리하여 장애 허용 능력뿐만 아니라 병렬 처리가 가능하게 한다스쿱은 2012년 3월 최상위 아파치 프로젝트가 되었다.

'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce 2  (0) 2017.05.03
Map Reduce 1  (0) 2017.05.02

MapReduce는 일종의 함수형 프로그래밍이다. map, reduce라는 합쳐진 용어로두 함수의 조합을 통해서 분산/병렬 시스템을 운용을 지원한다.

 

데이터 형태는 <Key , Value>형태로 저장이 되는데 여기서 키는 key는 라인이 시작되는 byte offset이며, value는 라인자체의 내용이다. 일반적으로 key는 관련이 없는 것으로 간주한다.


map(key, value) -> [(key,value),(key,value),(key,value)]

ex)

i am a

boy you

are a girl

==>

input : <1, "i am a"><2, "boy you"><3,"are a girl">

output : <i:1><am:1><a:2> ...<girl:1> (<key value>의 리스트형태)

 

맵 함수의 입력으로 키(k1), (v1)이 전달 되면맵 함수는 전달된 키-값을 이용해 사용자의 로직을 처리 (이미 구현된 map이 있지만사용자가 구현도 가능함.) 한 후 출력으로 새로운 키(k2)와 값(v2)의 리스트형태로 출력 한다이 때 키-값이 그대로 출력될 수 있으며출력 데이터의 갯수는 0 또는 1개 이상 일 수 있다.

 

맵 함수가 반복적으로 수행 되면 여러 개의 출력 데이터가 생성 되고이 출력 데이터를 키로 정렬하면 각 키에 여러 개의 데이터가 존재한다이 키(k2)와 값 목록(list(v2))이 리듀스 함수로 입력 된다리듀스 함수는 키-값 목록을 파라미터로 받아 사용자의 로직을 처리한 후 여러 개의 값을 출력 한다.

 


위 그림은 word Counter에 대한 예제이다.

 맵리듀스 프레임워크는 입력 파일의 값을 라인 단위로 맵 함수에 전달. (key : 해당 라인의 번호, value : 해당 라인의 내용)
 맵 함수는 공백을 기준으로 문자를 분리 한 후, 단어의 개수인 1을 출력
 맵 함수를 거쳐서 임시 결과가 출력
 출력결과를 키로 정렬 한 후, 각 값을 나열해 목록을 생성
 이렇게 정렬/병합 된 값(컴바인된 값)이 리듀스 함수로 전달
 리듀스 함수에서는 키에 단어가 전달되고, 값에 글자수 목록이 전달
 리듀스 함수는 값으로 글자수 목록에 반복을 수행 하면서, 합을 계산해 단어와 합을 출력

 

 

위의 예제를 수행 하기 위한 map 함수 와 reduce 함수이다. 참고하기 바란다.

map 함수

 

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

  private final static IntWritable one = new IntWritable(1);

 

  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    StringTokenizer itr = new StringTokenizer(value.toString());

    while (itr.hasMoreTokens()) {

      context.write(new Text(itr.nextToken()), one);

    }

  }

}


reduce 함수

 

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  private IntWritable result = new IntWritable();

 

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

    int sum = 0;

    for (IntWritable val : values) {

      sum += val.get();

    }

    result.set(sum);

    context.write(key, result);

  }

}


'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04
Map Reduce 1  (0) 2017.05.02

맵리듀스(MapReduce)?

대용량 데이터를 처리를 위한 분산 프로그래밍 모델


-  글에서 2004년 발표한 소프트웨어 프레임워크


타고난 병행성(병렬 처리 지원)을 내포


누구든지 임의로 활용할 수 있는 충분한 서버를 이용하여 대규모 데이터 분석 가능


흩어져 있는 데이터를 수직화하여그 데이터를 각각의 종류 별로 모으고(Map),

  Filtering Sorting을 거쳐 데이터를 뽑아내는(Reduce) 하는 분산처리 기술과 관련 프레임워크를 의미

mapreduce 여러노드에 task를 분배하는 방법

hadoop클러스터의 데이터를 처리하기 위한 시스템

Map단계, reduce단계 = fork-join이랑 비슷하다.

map: block마다 같은 작업 수행, reduce: 합치기

 


맵리듀스는 맵(Map) 단계와 리듀스(Reduce) 단계로 처리 과정을 나누어 작업

각 단계는 입력과 출력으로써 - 쌍을 가지고 있고그 타입은 프로그래머가 선택합니다또한맵과 리듀스 함수도 프로그래머가 직접 


작성하게 됩니다


Map은 흩어져 있는 데이터를 Key, Value의 형태로 연관성 있는 데이터 분류로 묶는 작업

Reduce Map화한 작업 중 중복 데이터를 제거하고 원하는 데이터를 추출하는 작업


MapReduce Logical Data Flow




(map)은 흩어져 있는 데이터를 관련 있는 데이터끼리 묶는 작업을 통해서 임시 데이터 집합으로 변형되며,

리듀스(Reduce)는 맵 작업에서 생성된 임시 데이터 집합에서 중복 데이터를 제거하고 원하는 데이터를 추출하는 작업을 진행합니다.

 

 

맵리듀스 잡(MapReduce Job): Client 수행 작업 단위

클라이언트가 수행하려는 작업 단위로써 입력 데이터맵리듀스 프로그램설정 정보로 구성


하둡은 Job Map Task Reduce Task로 작업을 나누어서 실행한다.


- Job 실행 과정을 “제어해주는 노드


 

잡 트래커(Job Tracker): 태스크 트래커가 수행할 Task 스케줄링시스템 전체 수행을 조절.

 태스크 트래커(Task Tracker): Task 수행하고잡트래커에게 전체 경과 보고.



맵리듀스 과정에서 데이터가 어떤 식으로 흘러가고 처리되는지 자세히 알아봅시다.

 

잡 실행과정

1 : N 방식

1 - 잡 트래커(노드)

N - 태스크 트래커(노드)

 

  • 잡 트래커 : 태스크 트래커가 수행할 태스크를 스케줄링 함으로써 시스템 전체에서 모든 잡이 수행되도록 조절.
  • 태스크 트래커 : 태스크를 수행하고 각 잡의 전체 경과를 하나의 레코드로 유지하는 경과 보고서를 잡 트래커에 보냄. (태스크가 실패하면 잡 트래커는 그것을 다른 태스크 트래커에 다시 스케줄 한다.
  • 입력스플릿&스플릿 : 맵리듀스 잡의 입력크기. 각 스플릿마다 하나의 맵 태스크를 생성하고, 그 스플릿에 있는 각 레코드를 사용자 정의 맵 함수로 처리한다.





  • Split(Block)

- 하둡은 입력된 데이터를 고정된 크기의 조각으로 나눈다 ⇒ Split

- 각 Split 마다 하나의 Map Task 생성해 그 split의 레코드를 Map함수로 처리한다.

- 전체 입력을 통째로 처리하는 것 보다 시간이 더 짧게 걸린다.

- 보통 64MB의 HDFS Block을 사용하는 추세이다.

 

  • Map Tasks Status 

※Node == computer


① Data–local

: HDFS 내의 입력 Data가 있는 노드에서 Map Task 실행

- 데이터 지역성 최적화(Data Locality Optimization). 네트워크 대역폭을 사용하지 않아 가장 잘 작동


② Rack–local

: 동일 랙 중 다른 노드에서 찾아와 실행

- HDFS 블록 복제 본이 저장된 세 개의 노드 모두가 다른 맵 태스크 실행 중일 때도 있다. 이럴 경우 잡트래커는 블록 복제 본이 저장된 동일 랙 중 다른 노드에서 이용 가능한 맵 슬롯 가져온다.


③ Off-rack

: 다른 외부 랙의 노드에서 찾아와 실행

- 랙 간 네트워크 전송을 해야만 하기 때문에 네트워크 대역폭 사용.

    


각 태스크 결과물 저장 장소 

Map Task 

 로컬디스크

 맵 결과물은 중간 결과물

Reduce Task 

HDFS 

최종결과물이므로 안정성을 위해 HDFS에 저장한다. 


 

1. 단일 리듀스 태스크

: 모든 중간 데이터를 혼자 처리해야 해서 매우 느려진다.


















2. 다중 리듀스 태스크

: Map 태스크는 Reduce 태스크 개수만큼 파티션을 생성하고 결과를 분배한다.



















3. 리듀스 태스크 없음 

: 완전히 병렬로 수행. 셔플이 필요없는 경우에 적합.


 












컴바이너 함수 (Combiner Function)




대역폭은 제한적. 따라서 데이터 전송은 줄일수록 좋다.

- 맵과 리듀스 태스크 간 데이터전송을 최소화 하는 것이 좋다.

- 최적화와 관련 있기 때문에 필수 사항은 아니다. 0번을 써도, 여러 번 호출 되도 출력 결과는 같다.

- 매퍼와 리듀서 사이에서 셔플할 데이터의 양을 줄이는데 큰 도움을 준다.




출처: http://over153cm.tistory.com/entry/맵리듀스MapReduce란-1 [빅데이터는 넘커]
출처: http://over153cm.tistory.com/entry/맵리듀스란-2 [빅데이터는 넘커]

http://bigbigdata.tistory.com/5

'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04
MapReduce 2  (0) 2017.05.03

+ Recent posts