I have been playing around with the java-sizeof library (https://github.com/phatak-dev/java-sizeof) and using it to measure data set sizes in Apache Spark. As it turns out, the Row object is ridiculously big. Like hugely big -- why is that?
Take a fairly simple schema:
root |-- account: string (nullable = true) |-- date: long (nullable = true) |-- dialed: string (nullable = true) |-- duration: double (nullable = true) Example data looks like this:
+-------+-------------+----------+--------+ |account| date| dialed|duration| +-------+-------------+----------+--------+ | 5497|1434620384003|9075112643| 790.0| +-------+-------------+----------+--------+ So now we do:
val row = df.take(1)(0) // row: org.apache.spark.sql.Row = [5497,1434620384003,9075112643,790.0] So now I use SizeEstimator
SizeEstimator.estimate(row) // res19: Long = 85050896 81 megabytes! For a single row! Thinking this is some kind of mistake, I do:
SizeEstimator.estimate(df.take(100)) // res20: Long = 85072696 Interestingly, it's not much bigger -- only about 20k bigger, despite holding 100 times the amount of data. Above 100, it seems to get linear. For 1,000 rows it looks like this:
SizeEstimator.estimate(df.take(1000)) // res21: Long = 850711696 Ok, so that's about 10 times bigger than 100 rows -- more or less linear. And from tests, it increases in a linear fashion continuing past 100 rows. Based on these tests, after about 100 rows, the cost per Row object is still over 800 KB !!
Out of curiosity, I tried a couple of different object types for the same underlying data. For example, here are the results for an Array of Array objects instead of Row objects:
SizeEstimator.estimate( df.map(r => (r.getString(0), r.getLong(1), r.getString(2), r.getDouble(3))).take(1) ) // res22: Long = 216 Ok, that's a little better. Even better, is that for 10 rows it is only 1976 bytes, and for 100 it is only 19,616 bytes. Definitely going in the right direction.
Then, I encoded the same DataFrame as an RDD[Array[Byte]] where each Array[Byte] is a binary-encoded Avro record, with the same schema as the underlying DataFrame. Then I do:
SizeEstimator.estimate(encodedRdd.take(1)) // res23: Long = 72 72 bytes -- even better! And, for 100 rows, it's 5,216 bytes -- about 52 bytes a row, and it keeps going down from there (48,656 bytes for 1,000 records).
So, at it's best, Row object weighs 850k per Row, whereas a binary Avro record of the same data is about 50 bytes.
What is going on??
1 Answers
Answers 1
Actually Row by itself is not that big. That is why you don't see a significant change in as size when you take more rows. Problem seems to be schema information:
When you collect data you actually get
GenericRowWithSchemaval df = Seq((1, "foo"), (2, "bar")).toDF df.first.getClass // res12: Class[_ <: org.apache.spark.sql.Row] = // class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchemaGenericRowWithSchemacarries schema information fromschemaargument:class GenericRowWithSchema(values: Array[Any], override val schema: StructType)Lets confirm this is really the source of the problem:
import com.madhukaraphatak.sizeof.SizeEstimator import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema val rowWithSchema = df.first val rowWithoutSchema = new GenericRowWithSchema( rowWithSchema.toSeq.toArray, null) SizeEstimator.estimate(rowWithSchema) // Long = 1444255708 SizeEstimator.estimate(rowWithoutSchema) // Long = 120Hypothesis: Estimated size you see includes a size of the schema:
SizeEstimator.estimate(df.schema) // Long = 1444361928which is roughly the same order of magnitude as collected rows. Lets create a new schema from scratch:
import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("_1",IntegerType,false), StructField("_2",StringType,true))) val anotherRowWithSchema = new GenericRowWithSchema( Array(0, "foo"), schema) SizeEstimator.estimate(anotherRowWithSchema) // Long = 1444905324So as you can see the results are consistent.
Why schema is so large? Hard to say. When you take a look at the code you'll see that
StructTypeis a complex class even excluding its companion object not a simple schema definition.It doesn't explain reported size though. I suspect it could be some fluke in the
SizeEstimatorbut I am not sure yet.You can further isolate the problem but estimation a size of a single
StructField:import org.apache.spark.sql.types._ import com.madhukaraphatak.sizeof.SizeEstimator object App { def main(args: Array[String]) { val schema = StructField("foo", IntegerType, true) println(SizeEstimator.estimate(schema)) // 271872172 } }
0 comments:
Post a Comment