Saturday, February 25, 2017

How to know which stage of a job is currently running in Apache Spark?

Leave a Comment

Consider I have a job as follow in Spark;

CSV File ==> Filter By A Column ==> Taking Sample ==> Save As JSON

Now my requirement is how do I know which step(Fetching file or Filtering or Sampling) of the job is currently executing programatically (Preferably using Java API)? Is there any way for this?

I can track Job,Stage and Task using SparkListener class. And it can be done like tracking a stage Id. But how to know which stage Id is for which step in the job chain.

What I want to send a notification to user when consider Filter By A Column is completed. For that I made a class that extends SparkListener class. But I can not find out from where I can get the name of currently executing transformation name. Is it possible to track at all?

public class ProgressListener extends SparkListener{    @Override   public void onJobStart(SparkListenerJobStart jobStart)   {    }    @Override   public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted)   {       //System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only   }    @Override   public void onTaskStart(SparkListenerTaskStart taskStart)   {       //no such method like taskStart.name()   } } 

2 Answers

Answers 1

You cannot exactly know when, e.g., the filter operation starts or finishes.

That's because you have transformations (filter,map,...) and actions (count, foreach,...). Spark will put as many operations into one stage as possible. Then the stage is executed in parallel on the different partitions of your input. And here comes the problem.

Assume you have several workers and the following program

LOAD ==> MAP ==> FILTER ==> GROUP BY + Aggregation

This program will probably have two stages: the first stage will load the file and apply the map and filter. Then the output will be shuffled to create the groups. In the second stage the aggregation will be performed.

Now, the problem is, that you have several workers and each will process a portion of your input data in parallel. That is, every executor in your cluster will receive a copy of your program(the current stage) and execute this on the assigned partition.

You see, you will have multiple instances of your map and filter operators that are executed in parallel, but not necessarily at the same time. In an extreme case, worker 1 will finish with stage 1 before worker 20 has started at all (and therefore finish with its filter operation before worker 20).

For RDDs Spark uses the iterator model inside a stage. For Datasets in the latest Spark version however, they create a single loop over the partition and execute the transformations. This means that in this case Spark itself does not really know when a transformation operator finished for a single task!

Long story short:

  1. You are not able the know when an operation inside a stage finishes
  2. Even if you could, there are multiple instances that will finish at different times.

So, now I already had the same problem:

In our Piglet project (please allow some adverstisement ;-) ) we generate Spark code from Pig Latin scripts and wanted to profile the scripts. I ended up in inserting mapPartition operator between all user operators that will send the partition ID and the current time to a server which will evaluate the messages. However, this solution also has its limitations... and I'm not completely satisfied yet.

However, unless you are able to modify the programs I'm afraid you cannot achieve want you want.

Answers 2

Did you consider this option: http://spark.apache.org/docs/latest/monitoring.html
It seems you can use the following rest api to get a certain job state /applications/[app-id]/jobs/[job-id]

You can set the JobGroupId and JobGroupDescription so you can track what job group is being handled. i.e. setJobGroup

Assuming you'll call the JobGroupId "test"

sc.setJobGroup("1", "Test job") 

When you'll call the http://localhost:4040/api/v1/applications/[app-id]/jobs/[job-id]

You'll get a json with a descriptive name for that job:

{   "jobId" : 3,   "name" : "count at <console>:25",   "description" : "Test Job",   "submissionTime" : "2017-02-22T05:52:03.145GMT",   "completionTime" : "2017-02-22T05:52:13.429GMT",   "stageIds" : [ 3 ],   "jobGroup" : "1",   "status" : "SUCCEEDED",   "numTasks" : 4,   "numActiveTasks" : 0,   "numCompletedTasks" : 4,   "numSkippedTasks" : 0,   "numFailedTasks" : 0,   "numActiveStages" : 0,   "numCompletedStages" : 1,   "numSkippedStages" : 0,   "numFailedStages" : 0 } 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment