I have a data set size of 10GB(example Test.txt).
I wrote my pyspark script like below(Test.py):
from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql import SQLContext spark = SparkSession.builder.appName("FilterProduct").getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) lines = spark.read.text("C:/Users/test/Desktop/Test.txt").rdd lines.collect()
Then I am executing the above script using below command :
spark-submit Test.py --executor-memory 12G
Then I am getting error like below:
17/12/29 13:27:18 INFO FileScanRDD: Reading File path: file:///C:/Users/test/Desktop/Test.txt, range: 402653184-536870912, partition values: [empty row] 17/12/29 13:27:18 INFO CodeGenerator: Code generated in 22.743725 ms 17/12/29 13:27:44 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3230) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/12/29 13:27:44 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3230) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93
Please let me know how to resolve this ?
2 Answers
Answers 1
you could try --conf "spark.driver.maxResultSize=20g". You should check the configurations on spark conf page.spark.apache.org/docs/latest/configuration.html. In addition to this answer i would like to suggest you to reduce your tasks result otherwise you could have trouble with serialization –
Answers 2
Did you check jvm's max heap size value while spark-submit. If you see the value that you pass to spark-submit that means you could set max heap size correctly.
For example if your setting is 4G as ./spark-submit --driver-memory 4G Test.py
you should see -Xmx4G
on jvisualvm screen as below.
Even you could set max heap size correctly you may see new error results of 7 tasks (1158.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
stacktrace is here
18/01/10 17:08:11 ERROR TaskSetManager: Total size of serialized results of 7 tasks (1158.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) 18/01/10 17:08:11 INFO BlockManagerInfo: Removed taskresult_4 on 10.0.2.151:45961 in memory (size: 176.3 MB, free: 1828.3 MB) 18/01/10 17:08:11 INFO TaskSchedulerImpl: Cancelling stage 0 18/01/10 17:08:11 INFO Executor: Executor is trying to kill task 6.0 in stage 0.0 (TID 6), reason: stage cancelled 18/01/10 17:08:11 INFO TaskSchedulerImpl: Stage 0 was cancelled 18/01/10 17:08:11 INFO DAGScheduler: ResultStage 0 (collect at /home/mehmet/dev/sdk/spark-2.2.0-bin-hadoop2.7/bin/test.py:9) failed in 28.540 s due to Job aborted due to stage failure: Total size of serialized results of 7 tasks (1158.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) 18/01/10 17:08:11 INFO DAGScheduler: Job 0 failed: collect at /home/mehmet/dev/sdk/spark-2.2.0-bin-hadoop2.7/bin/test.py:9, took 28.833384 s Traceback (most recent call last): File "/home/mehmet/dev/sdk/spark-2.2.0-bin-hadoop2.7/bin/test.py", line 9, in lines.collect() File "/home/mehmet/dev/sdk/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect File "/home/mehmet/dev/sdk/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/home/mehmet/dev/sdk/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/home/mehmet/dev/sdk/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError18/01/10 17:08:11 INFO MemoryStore: Block taskresult_6 stored as bytes in memory (estimated size 176.3 MB, free 1651.7 MB) 18/01/10 17:08:11 INFO BlockManagerInfo: Added taskresult_6 in memory on 10.0.2.151:45961 (size: 176.3 MB, free: 1652.0 MB) : An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 7 tasks (1158.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
You can adjust spark.driver.maxResultSize value like spark-submit --driver-memory 2g --conf "spark.driver.maxResultSize=2g" Test.py
Even you get error you can share verbose out of this command. spark-submit --driver-memory 10g --conf "spark.driver.maxResultSize=2g" Test.py
so we can see your platform specific conditions.
0 comments:
Post a Comment