Friday, December 15, 2017

Why does PySpark fail with random “Socket is closed” error?

1 comment

I just went through a PySpark training course and I'm compiling a script of example lines of code (which explains why the code block does nothing). Every time I run this code, I get this error once or twice. The line which throws it changes between runs. I've tried setting spark.executor.memory and spark.executor.heartbeatInterval, but the error persists. I've also tried putting .cache() at the end of various lines, with no changes.

The error:

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python java.net.SocketException: Socket is closed         at java.net.Socket.shutdownOutput(Socket.java:1551)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)         at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344)         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)         at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 

The code:

from pyspark import SparkConf, SparkContext  def parseLine(line):     fields = line.split(',')     return (int(fields[0]), float(fields[2]))  def parseGraphs(line):     fields = line.split()     return (fields[0]), [int(n) for n in fields[1:]]  # putting the [*] after local makes it run one executor on each core of your local PC conf = SparkConf().setMaster("local[*]").setAppName("MyProcessName")  sc = SparkContext(conf = conf)  # parse the raw data and map it to an rdd. # each item in this rdd is a tuple # two methods to get the exact same data: ########## All of these methods can use lambda or full methods in the same way ########## # read in a text file customerOrdersLines = sc.textFile("file:///SparkCourse/customer-orders.csv") customerOrdersRdd = customerOrdersLines.map(parseLine) customerOrdersRdd = customerOrdersLines.map(lambda l: (int(l.split(',')[0]), float(l.split(',')[2]))) print customerOrdersRdd.take(1)  # countByValue groups identical values and counts them salesByCustomer = customerOrdersRdd.map(lambda sale: sale[0]).countByValue() print salesByCustomer.items()[0]  # use flatMap to cut everything up by whitespace bookText = sc.textFile("file:///SparkCourse/Book.txt") bookRdd = bookText.flatMap(lambda l: l.split()) print bookRdd.take(1)  # create key/value pairs that will allow for more complex uses names = sc.textFile("file:///SparkCourse/marvel-names.txt") namesRdd = names.map(lambda line: (int(line.split('\"')[0]), line.split('\"')[1].encode("utf8"))) print namesRdd.take(1)  graphs = sc.textFile("file:///SparkCourse/marvel-graph.txt") graphsRdd = graphs.map(parseGraphs) print graphsRdd.take(1)  # this will append "extra text" to each name. # this is faster than a normal map because it doesn't give you access to the keys extendedNamesRdd = namesRdd.mapValues(lambda heroName: heroName + "extra text") print extendedNamesRdd.take(1)  # not the best example because the costars is already a list of integers # but this should return a list, which will update the values flattenedCostarsRdd = graphsRdd.flatMapValues(lambda costars: costars) print flattenedCostarsRdd.take(1)  # put the heroes in ascending index order sortedHeroes = namesRdd.sortByKey() print sortedHeroes.take(1)  # to sort heroes by alphabetical order, we switch key/value to value/key, then sort alphabeticalHeroes = namesRdd.map(lambda (key, value): (value, key)).sortByKey() print alphabeticalHeroes.take(1)  # make sure that "spider" is in the name of the hero spiderNames = namesRdd.filter(lambda (id, name): "spider" in name.lower()) print spiderNames.take(1)  # reduce by key keeps the key and performs aggregation methods on the values.  in this example, taking the sum combinedGraphsRdd = flattenedCostarsRdd.reduceByKey(lambda value1, value2: value1 + value2) print combinedGraphsRdd.take(1)  # broadcast: this is accessible from any executor sentData = sc.broadcast(["this can be accessed by all executors", "access it using sentData"])  # accumulator:  this is synced across all executors hitCounter = sc.accumulator(0) 

1 Answers

Answers 1

DISCLAIMER: I haven't spent enough time on that part of Spark's codebase, but let me give you some hints that may lead to a solution. What follows is just to explain where to search for more information not a solution to the issue.


The exception you are facing is due to some other issue as seen in the code here (as you may see by the line java.net.Socket.shutdownOutput(Socket.java:1551) which is when worker.shutdownOutput() is executed).

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python java.net.SocketException: Socket is closed         at java.net.Socket.shutdownOutput(Socket.java:1551)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)         at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870)         at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344)         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)         at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 

That leads me to believe that the ERROR is a follow-up to some other earlier error.

The name stdout writer for python is the name of the thread that (uses EvalPythonExec physical operator and) is responsible for the communication between Spark and pyspark (so you can execute python code without much changes).

As a matter of fact the scaladoc of EvalPythonExec gives quite a lot of information on the underlying communication infrastructure that pyspark uses internally and that uses sockets to an external Python process.

Python evaluation works by sending the necessary (projected) input data via a socket to an external Python process, and combine the result from the Python process with the original row.

Moreover, python is used by default unless overriden using PYSPARK_DRIVER_PYTHON or PYSPARK_PYTHON (as you can see in pyspark shell script here and here). That's the name that appears in the name of the thread that fails.

16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python

I'd recommend checking out the version of python on your system using the following command.

python -c 'import sys; print(sys.version_info)' 

That should be Python 2.7+, but could be that you use the very latest Python that is not well tested with Spark. Guessing...


You should include the entire log of the pyspark application's execution and that's where I'd expect to find the answer.

If You Enjoyed This, Take 5 Seconds To Share It

1 comment: