Monday, April 25, 2016

Why is a Spark Row object so big compared to equivalent structures?

Leave a Comment

I have been playing around with the java-sizeof library (https://github.com/phatak-dev/java-sizeof) and using it to measure data set sizes in Apache Spark. As it turns out, the Row object is ridiculously big. Like hugely big -- why is that?

Take a fairly simple schema:

root  |-- account: string (nullable = true)  |-- date: long (nullable = true)  |-- dialed: string (nullable = true)  |-- duration: double (nullable = true) 

Example data looks like this:

+-------+-------------+----------+--------+ |account|         date|    dialed|duration| +-------+-------------+----------+--------+ |   5497|1434620384003|9075112643|   790.0| +-------+-------------+----------+--------+ 

So now we do:

val row = df.take(1)(0) // row: org.apache.spark.sql.Row = [5497,1434620384003,9075112643,790.0] 

So now I use SizeEstimator

SizeEstimator.estimate(row) // res19: Long = 85050896 

81 megabytes! For a single row! Thinking this is some kind of mistake, I do:

SizeEstimator.estimate(df.take(100)) // res20: Long = 85072696 

Interestingly, it's not much bigger -- only about 20k bigger, despite holding 100 times the amount of data. Above 100, it seems to get linear. For 1,000 rows it looks like this:

SizeEstimator.estimate(df.take(1000)) // res21: Long = 850711696 

Ok, so that's about 10 times bigger than 100 rows -- more or less linear. And from tests, it increases in a linear fashion continuing past 100 rows. Based on these tests, after about 100 rows, the cost per Row object is still over 800 KB !!

Out of curiosity, I tried a couple of different object types for the same underlying data. For example, here are the results for an Array of Array objects instead of Row objects:

SizeEstimator.estimate(   df.map(r => (r.getString(0), r.getLong(1), r.getString(2), r.getDouble(3))).take(1) ) // res22: Long = 216 

Ok, that's a little better. Even better, is that for 10 rows it is only 1976 bytes, and for 100 it is only 19,616 bytes. Definitely going in the right direction.

Then, I encoded the same DataFrame as an RDD[Array[Byte]] where each Array[Byte] is a binary-encoded Avro record, with the same schema as the underlying DataFrame. Then I do:

SizeEstimator.estimate(encodedRdd.take(1)) // res23: Long = 72 

72 bytes -- even better! And, for 100 rows, it's 5,216 bytes -- about 52 bytes a row, and it keeps going down from there (48,656 bytes for 1,000 records).

So, at it's best, Row object weighs 850k per Row, whereas a binary Avro record of the same data is about 50 bytes.

What is going on??

1 Answers

Answers 1

Actually Row by itself is not that big. That is why you don't see a significant change in as size when you take more rows. Problem seems to be schema information:

  1. When you collect data you actually get GenericRowWithSchema

    val df = Seq((1, "foo"), (2, "bar")).toDF df.first.getClass  // res12: Class[_ <: org.apache.spark.sql.Row] =  //   class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
  2. GenericRowWithSchema carries schema information from schema argument:

    class GenericRowWithSchema(values: Array[Any],    override val schema: StructType) 
  3. Lets confirm this is really the source of the problem:

    import com.madhukaraphatak.sizeof.SizeEstimator import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema  val rowWithSchema = df.first  val rowWithoutSchema = new GenericRowWithSchema(   rowWithSchema.toSeq.toArray, null)  SizeEstimator.estimate(rowWithSchema) // Long = 1444255708  SizeEstimator.estimate(rowWithoutSchema) // Long = 120 
  4. Hypothesis: Estimated size you see includes a size of the schema:

    SizeEstimator.estimate(df.schema) // Long = 1444361928 

    which is roughly the same order of magnitude as collected rows. Lets create a new schema from scratch:

    import org.apache.spark.sql.types._  val schema = StructType(Seq(   StructField("_1",IntegerType,false),    StructField("_2",StringType,true)))   val anotherRowWithSchema = new GenericRowWithSchema(   Array(0, "foo"), schema)   SizeEstimator.estimate(anotherRowWithSchema) // Long = 1444905324 

    So as you can see the results are consistent.

  5. Why schema is so large? Hard to say. When you take a look at the code you'll see that StructType is a complex class even excluding its companion object not a simple schema definition.

    It doesn't explain reported size though. I suspect it could be some fluke in the SizeEstimator but I am not sure yet.

  6. You can further isolate the problem but estimation a size of a single StructField:

    import org.apache.spark.sql.types._ import com.madhukaraphatak.sizeof.SizeEstimator  object App {   def main(args: Array[String]) {     val schema = StructField("foo", IntegerType, true)     println(SizeEstimator.estimate(schema))     // 271872172   } } 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment