Hadoop ecosystem/MapReduce
WordCount
snoohey
2018. 4. 14. 22:52
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ //KeyIn, ValueIn, KeyOut, ValueOut //IntWritable : Integer형이면서 Writable, Comparable한 자료형 private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //KeyIn, ValueIn /* On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat. (0, On the top of the Crumpetty Tree) (33, The Quangle Wangle sat,) (57, But his face you could not see,) (89, On account of his Beaver Hat.) */ public void map(Object key, Text value, Context context) //{ byte단위 offset, String, Context } throws IOException, InterruptedException { //value를 " " 단위로 Tokenizing한다. StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); //KeyOut, ValueOut context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //KeyIn, ValueIn, KeyOut, ValueOut private IntWritable result = new IntWritable(); //KeyIn, ValueIn public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); //KeyOut, ValueOut context.write(key, result); } } public static void main(String[] args) throws Exception { //initialize configuration Configuration conf = new Configuration(); //create Job Instance Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); //Setting Classes using Job Instance job.setMapperClass(TokenizerMapper.class); //같은 노드내에서 reduce작업을 수행함으로써 전송되는 데이터의 갯수를 줄인다. job.setCombinerClass(IntSumReducer.class); //input은 shuffle을 통해 각 노드에서 key별로 (key,[1,2,3,1,1,2])가 된 상태 job.setReducerClass(IntSumReducer.class); //job을통해 최종 Output될 클래스 세팅 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //해당 경로를 job의 configuration.mapred.input.dir에 넣어준다. FileInputFormat.addInputPath(job, new Path(args[0])); //Output dir을 설정한다. FileOutputFormat.setOutputPath(job, new Path(args[1])); //excute the Application //Blocking Function(실행이 완료될때 까지 return되지않음) //Submit the job, then poll for progress until the job is complete System.exit(job.waitForCompletion(true) ? 0 : 1); //Job이 실행되면 내부적으로 InputSplit과 RecoreReader를 실행해서 //Map에 적합한 {Key,Value}쌍을 만든다. } } | cs |
Sample text-files as input:
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
Run the application:
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
Output:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2