Sunday, October 29, 2017

Spark UDAFs: How to minimize/eliminate datatype conversion, copying and boxing overheads

Leave a Comment

My Spark App is written in Scala. I am writing several UDAFs that perform computations on "vectorized" data, where each value is an array of constant size (say 16) of doubles, instead of just one scalar value. Calculations are done on a per-element basis. My goal is to have the UDAF execute as efficiently as possible. To this end I would like the doubles to appear contiguously in memory, and to see that Spark's code gen + JVM's JIT compiler will do the computation via SIMD instructions.

However, it would seem that writing a UDAF in the straightforward documented manner and using standard DataFrame functionality leads Spark to generate awfully inefficient layouts for its Aggregator and Row objects. The data in the arrays reaches my code with type WrappedArray[Double]. It's a proxy to an Object[] array, containing 16 boxed Doubles. Not only does that take around 6-8X the amount of memory of the actual raw data, it also makes it impossible to use SIMD instructions since the doubles themselves are not in contiguous memory locations.

so for example, a simple UDAF that does a "vectorized sum" has an update function that looks like this:

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {   val acc = buffer.getAs[WrappedArray[Double]](0)   if (!input.isNullAt(0)) {     val value = input.getAs[WrappedArray[Double]](0)     for (i <- 0 until N) {       acc(i) += value(i)     }     buffer(0) = acc   } } 

This is an operation that, in a sanely-written native program, would look something like this:

void update(double* acc, const double* input) {   for(size_t i = 0; i != N; ++i) {     acc[i] += input[i]   } } 

What's happening in my UDAF? as far as I can tell, just the fact that I needed the buffer(0) = acc line at the end to exist (or the accumulator doesn't update) means that array contents are being copied. So it first creates new two object arrays of size N (acc and value), copies the original boxed Doubles into them, then the += creates a new boxed Double with the result for each element, puts them back in acc, then the array acc is copied back into the array in buffer(0).

That is just plain awful. I haven't run any profiling yet, but I fully expect the 2nd code fragment to run about 20-50 times faster than this.

There has to be a better way to do things. I read a while ago about "Project Tungsten" and apparently Spark can operate with unmanaged memory buffers - I'm not a Spark developer though so I don't even know if I can use these capabilities for my UDAFs, if so how? Is there some other way to at least eliminate the boxing and the useless array copying?

Edit: example inputs and outputs are as follows -

case class TestInRow(id: Int, value: Array[Double])  // A UDAF that sums up arrays of 4 doubles val sumVec = MyUDAFs.SumVec(4)  val inputDF = Seq(   TestInRow(0, Array[Double](1, 2, 5, 10)),   TestInRow(1, Array[Double](0, -1, 5, -12)),   TestInRow(2, Array[Double](0, 0, 5, 10)),   TestInRow(3, Array[Double](0, -1, 5, -8)) ).toDF  val resultDF = inputDF.groupBy().agg(sumVec(col("value")) as "svec")  val output = resultDF.head.getAs[WrappedArray[Double]](0) assert(output == Seq(1.0, 0.0, 20.0, 0.0)) 

NOTE: In the real test I'm actually making an expectedDF with that one row and the expected values in it, then call a function that compares DFs by joining on a key tuple (an empty tuple in this case) and checks all doubles in the "value" columns match to within an error tolerance.

NOTE: This is taken from a unit test. In my real usecase, the arrays are initially formed from an array(expr*) construct in a select(...) call on another DF that contains scalars. Something like this:

df.select(    k1,    k2,    k3,    array(      sum(when(expr(predicates(0)(k4, k5)), qty).otherwise(lit(0.0))),      sum(when(expr(predicates(1)(k4, k5)), qty).otherwise(lit(0.0))),      sum(when(expr(predicates(2)(k4, k5)), qty).otherwise(lit(0.0))),      ...      sum(when(expr(predicates(N-1)(k4, k5)), qty).otherwise(lit(0.0)))    ) as "packed_qty" ) 

and then I want to aggregate with vectorizedSum(packed_qty), for example in an sql query that has GROUP BY ... GROUPING SETS (...) over several combinations of (k1, k2, k3).

There are several other UDAFs, they all work on the same principle: the inputs are arrays of N doubles, it does some FP operation (or several) between them on a per-element basis and aggregates the result(s) into array(s) of size N

0 Answers

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment