Friday, March 17, 2017

Reducer task is not being invoked in my MapReduce job

Leave a Comment

It's a word-count map reduce job. I have my own InputFormat.

JobExecutor:

val job = new Job(new Configuration())  job.setMapperClass(classOf[CountMapper]) job.setReducerClass(classOf[CountReducer])  job.setJobName("tarun-test-1") job.setInputFormatClass(classOf[MyInputFormat]) FileInputFormat.setInputPaths(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1)))  job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[LongWritable])  job.setNumReduceTasks(1)  println("status: " + job.waitForCompletion(true)) 

Mapper:

class CountMapper extends Mapper[LongWritable, Text, Text, LongWritable] {      private val valueOut = new LongWritable(1L)      override def map(k: LongWritable, v: Text, context: Mapper[LongWritable, Text, Text, LongWritable]#Context): Unit = {         val str = v.toString         str.split(",").foreach(word => {             val keyOut = new Text(word.toLowerCase.trim)             context.write(keyOut, valueOut)         })     } } 

Reducer:

class CountReducer extends Reducer[Text, LongWritable, Text, LongWritable] {      override def reduce(k: Text, values: Iterable[LongWritable], context: Reducer[Text, LongWritable, Text, LongWritable]#Context): Unit = {         println("Inside reduce method..")         val valItr = values.iterator()         var sum = 0L         while (valItr.hasNext) {             sum = sum + valItr.next().get()         }          context.write(k, new LongWritable(sum))         println("done reducing.")     } } 

Mapper is being invoked and RecordReader is reading splits properly based on the logs. However, reducer is not being invoked.

1 Answers

Answers 1

Try setting : job.mapOutputKeyClass and job.MapOutputValueClass .

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment