1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class WordCount {
    
    public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>//KeyIn, ValueIn, KeyOut, ValueOut 
 
        //IntWritable : Integer형이면서 Writable, Comparable한 자료형
        private final static IntWritable one = new IntWritable(1);
 
        private Text word = new Text();
                        //KeyIn, ValueIn
        
        /*
        On the top of the Crumpetty Tree
        The Quangle Wangle sat,
        But his face you could not see,
        On account of his Beaver Hat.
        (0, On the top of the Crumpetty Tree)
        (33, The Quangle Wangle sat,)
        (57, But his face you could not see,)
        (89, On account of his Beaver Hat.)
        */
        
        public void map(Object key, Text value, Context context)  
                        //{ byte단위 offset, String, Context }
                            throws IOException, InterruptedException {
 
        //value를 " " 단위로 Tokenizing한다.
        StringTokenizer itr = new StringTokenizer(value.toString());
 
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                
                //KeyOut, ValueOut
                context.write(word, one);
            }
 
        }
    }
    public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> { //KeyIn, ValueIn, KeyOut, ValueOut 
 
    private IntWritable result = new IntWritable();
 
                        //KeyIn, ValueIn
    public void reduce(Text key, Iterable<IntWritable> values, Context context
                        ) throws IOException, InterruptedException {
            int sum = 0;
            
            for (IntWritable val : values) {
                sum += val.get();
            }
            
            result.set(sum);
            
            //KeyOut, ValueOut 
            context.write(key, result);
        }
    }
 
    public static void main(String[] args) throws Exception {
        
        //initialize configuration
        Configuration conf = new Configuration();
 
        //create Job Instance
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        
        //Setting Classes using Job Instance
        job.setMapperClass(TokenizerMapper.class);
 
        //같은 노드내에서 reduce작업을 수행함으로써 전송되는 데이터의 갯수를 줄인다.
        job.setCombinerClass(IntSumReducer.class);
 
        //input은 shuffle을 통해 각 노드에서 key별로 (key,[1,2,3,1,1,2])가 된 상태
        job.setReducerClass(IntSumReducer.class);
 
        //job을통해 최종 Output될 클래스 세팅
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //해당 경로를 job의 configuration.mapred.input.dir에 넣어준다.
        FileInputFormat.addInputPath(job, new Path(args[0]));
        
        //Output dir을 설정한다.
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
        //excute the Application
        //Blocking Function(실행이 완료될때 까지 return되지않음)
        //Submit the job, then poll for progress until the job is complete
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        
        //Job이 실행되면 내부적으로 InputSplit과 RecoreReader를 실행해서 
        //Map에 적합한 {Key,Value}쌍을 만든다.
    }
}
 
cs





Sample text-files as input:


$ bin/hadoop fs -ls /user/joe/wordcount/input/

/user/joe/wordcount/input/file01

/user/joe/wordcount/input/file02


$ bin/hadoop fs -cat /user/joe/wordcount/input/file01

Hello World Bye World


$ bin/hadoop fs -cat /user/joe/wordcount/input/file02

Hello Hadoop Goodbye Hadoop


Run the application:

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output


Output:


$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000

Bye 1

Goodbye 1

Hadoop 2

Hello 2

World 2


'Hadoop ecosystem > MapReduce' 카테고리의 다른 글

DataFlow  (0) 2018.04.14
Indexing  (0) 2018.04.14
InputType  (0) 2018.04.14
MapReduce 3  (1) 2018.04.09
MapReduce프로그래밍을 위한 HL  (0) 2017.05.04

+ Recent posts