import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ //KeyIn, ValueIn, KeyOut, ValueOut
//IntWritable : Integer형이면서 Writable, Comparable한 자료형
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//KeyIn, ValueIn
/*
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
*/
public void map(Object key, Text value, Context context)
//{ byte단위 offset, String, Context }
throws IOException, InterruptedException {
//value를 " " 단위로 Tokenizing한다.
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
//KeyOut, ValueOut
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> { //KeyIn, ValueIn, KeyOut, ValueOut
private IntWritable result = new IntWritable();
//KeyIn, ValueIn
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
//KeyOut, ValueOut
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
//initialize configuration
Configuration conf = new Configuration();
//create Job Instance
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
//Setting Classes using Job Instance
job.setMapperClass(TokenizerMapper.class);
//같은 노드내에서 reduce작업을 수행함으로써 전송되는 데이터의 갯수를 줄인다.
job.setCombinerClass(IntSumReducer.class);
//input은 shuffle을 통해 각 노드에서 key별로 (key,[1,2,3,1,1,2])가 된 상태
job.setReducerClass(IntSumReducer.class);
//job을통해 최종 Output될 클래스 세팅
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//해당 경로를 job의 configuration.mapred.input.dir에 넣어준다.
FileInputFormat.addInputPath(job, new Path(args[0]));
//Output dir을 설정한다.
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//excute the Application
//Blocking Function(실행이 완료될때 까지 return되지않음)
//Submit the job, then poll for progress until the job is complete
System.exit(job.waitForCompletion(true) ? 0 : 1);
//Job이 실행되면 내부적으로 InputSplit과 RecoreReader를 실행해서
//Map에 적합한 {Key,Value}쌍을 만든다.
}
}