Sunday, October 29, 2017

Spark is only using one worker machine when more are available

Leave a Comment

I'm trying to parallelize a machine learning prediction task via Spark. I've used Spark successfully a number of times before on other tasks and have faced no issues with parallelization before.

In this particular task, my cluster has 4 workers. I'm calling mapPartitions on an RDD with 4 partitions. The map function loads a model from disk (a bootstrap script distributes all that is needed to do this; I've verified it exists on each slave machine) and performs prediction on data points in the RDD partition.

The code runs, but only utilizes one executor. The logs for the other executors say "Shutdown hook called". On different runs of the code, it uses different machines, but only one at a time.

How can I get Spark to use multiple machines at once?

I'm using PySpark on Amazon EMR via Zeppelin notebook. Code snippets are below.

%spark.pyspark  sc.addPyFile("/home/hadoop/MyClassifier.py") sc.addPyFile("/home/hadoop/ModelLoader.py")  from ModelLoader import ModelLoader from MyClassifier import MyClassifier  def load_models():     models_path = '/home/hadoop/models'     model_loader = ModelLoader(models_path)      models = model_loader.load_models()     return models  def process_file(file_contents, models):     filename = file_contents[0]     filetext = file_contents[1]     pred = MyClassifier.predict(filetext, models)     return (filename, pred)  def process_partition(file_list):     models = load_models()     for file_contents in file_list:         pred = process_file(file_contents, models)         yield pred   all_contents = sc.wholeTextFiles("s3://some-path", 4) processed_pages = all_contents.mapPartitions(process_partition) processedDF = processed_pages.toDF(["filename", "pred"]) processedDF.write.json("s3://some-other-path", mode='overwrite') 

There are four tasks as expected, but they all run on the same executor!

I have the cluster running and can provide logs as available in Resource Manager. I just don't know yet where to look.

1 Answers

Answers 1

Two points to mention here (not sure if they will solve your issue though):

  1. wholeTextFiles uses WholeTextFileInputFormat which extends CombineFileInputFormat, and because of CombineFileInputFormat, it will try to combine groups of small files into one partition. So if you set the number of partition to 2 for example, you 'might' get two partitions but it is not guaranteed, it depends on the size of the files you are reading.
  2. The output of wholeTextFiles is an RDD that contains an entire file in each record (and each record/file cannot be split so it will end by being in a single partition/worker). So if you are reading one file only, you will end by having the full file in one partition despite that you set the partitioning to 4 in your example.
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment