Saturday, July 9, 2016

Sorting JavaPairRDD first by value and then by key

I'm trying to sort an RDD by value, and if multiple values are equal then I need to these values by key lexicographically.

JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {      @Override     public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {         return new Tuple2 < String, Long > (t._1, t._2.count);     } }); 

What I have done so far is this, using takeOrdered and providing a CustomComperator, but since takeOrdered can't handle a large amount of data, when running the code it keeps exiting (it eats a lot of memory that the OS can't handle) :

List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {      @Override     public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {         return new Tuple2 < String, Long > (t._1, t._2.count);     } }).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP); 


    static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable {         private static final long serialVersionUID = 1L;          private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator();          @Override         public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) {             if (o1._2.compareTo(o2._2) == 0) {                 return o1._1.compareTo(o2._1);             }             return -o1._2.compareTo(o2._2);         } } 


16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at, took 418.149182 s 

How would you sort this RDD? How would you take the TopKMovies considering value, and in case of equality keys lexicographically.


2 Answers

Answers 1

Solved the problem using sortByKey with a comparator & partitions, after maping the <String, Long> PairRDD to < Tuple2<String,Long> , Long> PairRDD

JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long > () {      @Override     public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception {         return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count);     } }).sortByKey(new TupleMapLongComparator(), true, 100);   JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() {      @Override     public Tuple2<String, Long> call(             Tuple2<Tuple2<String, Long>, Long> t) throws Exception {         return new Tuple2 < String, Long > (t._1._1, t._1._2);     }  }); 


private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable {     @Override     public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) {          if (tuple1._2.compareTo(tuple2._2) == 0) {             return tuple1._1.compareTo(tuple2._1);         }         return -tuple1._2.compareTo(tuple2._2);     } } 

Answers 2

did you try secondary sorting in Spark ?

Spark Secondary Sort

