Reducer task is not being invoked in my MapReduce job

It's a word-count map reduce job. I have my own InputFormat.


val job = new Job(new Configuration())  job.setMapperClass(classOf[CountMapper]) job.setReducerClass(classOf[CountReducer])  job.setJobName("tarun-test-1") job.setInputFormatClass(classOf[MyInputFormat]) FileInputFormat.setInputPaths(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1)))  job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[LongWritable])  job.setNumReduceTasks(1)  println("status: " + job.waitForCompletion(true)) 


class CountMapper extends Mapper[LongWritable, Text, Text, LongWritable] {      private val valueOut = new LongWritable(1L)      override def map(k: LongWritable, v: Text, context: Mapper[LongWritable, Text, Text, LongWritable]#Context): Unit = {         val str = v.toString         str.split(",").foreach(word => {             val keyOut = new Text(word.toLowerCase.trim)             context.write(keyOut, valueOut)         })     } } 


class CountReducer extends Reducer[Text, LongWritable, Text, LongWritable] {      override def reduce(k: Text, values: Iterable[LongWritable], context: Reducer[Text, LongWritable, Text, LongWritable]#Context): Unit = {         println("Inside reduce method..")         val valItr = values.iterator()         var sum = 0L         while (valItr.hasNext) {             sum = sum +         }          context.write(k, new LongWritable(sum))         println("done reducing.")     } } 

Mapper is being invoked and RecordReader is reading splits properly based on the logs. However, reducer is not being invoked.

1 Answers

Try setting : job.mapOutputKeyClass and job.MapOutputValueClass .

