I'm trying to sort an RDD by value, and if multiple values are equal then I need to these values by key lexicographically.
code :
JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () { @Override public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { return new Tuple2 < String, Long > (t._1, t._2.count); } });
What I have done so far is this, using takeOrdered
and providing a CustomComperator
, but since takeOrdered
can't handle a large amount of data, when running the code it keeps exiting (it eats a lot of memory that the OS can't handle) :
List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () { @Override public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { return new Tuple2 < String, Long > (t._1, t._2.count); } }).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP);
Comperator:
static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable { private static final long serialVersionUID = 1L; private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator(); @Override public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) { if (o1._2.compareTo(o2._2) == 0) { return o1._1.compareTo(o2._1); } return -o1._2.compareTo(o2._2); } }
ERROR:
16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s
How would you sort this RDD? How would you take the TopKMovies
considering value, and in case of equality keys lexicographically.
Thanks.
2 Answers
Answers 1
Solved the problem using sortByKey with a comparator & partitions, after maping the <String, Long>
PairRDD to < Tuple2<String,Long> , Long>
PairRDD
JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long > () { @Override public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception { return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count); } }).sortByKey(new TupleMapLongComparator(), true, 100); JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() { @Override public Tuple2<String, Long> call( Tuple2<Tuple2<String, Long>, Long> t) throws Exception { return new Tuple2 < String, Long > (t._1._1, t._1._2); } });
Comparator:
private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable { @Override public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) { if (tuple1._2.compareTo(tuple2._2) == 0) { return tuple1._1.compareTo(tuple2._1); } return -tuple1._2.compareTo(tuple2._2); } }
Answers 2
did you try secondary sorting in Spark ?
0 comments:
Post a Comment