It's a word-count map reduce job. I have my own InputFormat.
JobExecutor:
val job = new Job(new Configuration()) job.setMapperClass(classOf[CountMapper]) job.setReducerClass(classOf[CountReducer]) job.setJobName("tarun-test-1") job.setInputFormatClass(classOf[MyInputFormat]) FileInputFormat.setInputPaths(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1))) job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[LongWritable]) job.setNumReduceTasks(1) println("status: " + job.waitForCompletion(true))
Mapper:
class CountMapper extends Mapper[LongWritable, Text, Text, LongWritable] { private val valueOut = new LongWritable(1L) override def map(k: LongWritable, v: Text, context: Mapper[LongWritable, Text, Text, LongWritable]#Context): Unit = { val str = v.toString str.split(",").foreach(word => { val keyOut = new Text(word.toLowerCase.trim) context.write(keyOut, valueOut) }) } }
Reducer:
class CountReducer extends Reducer[Text, LongWritable, Text, LongWritable] { override def reduce(k: Text, values: Iterable[LongWritable], context: Reducer[Text, LongWritable, Text, LongWritable]#Context): Unit = { println("Inside reduce method..") val valItr = values.iterator() var sum = 0L while (valItr.hasNext) { sum = sum + valItr.next().get() } context.write(k, new LongWritable(sum)) println("done reducing.") } }
Mapper is being invoked and RecordReader is reading splits properly based on the logs. However, reducer is not being invoked.
1 Answers
Answers 1
Try setting : job.mapOutputKeyClass and job.MapOutputValueClass .
0 comments:
Post a Comment