Monday, May 8, 2017

How to use PySpark to load a rolling window from daily files?

Leave a Comment

I have a large number of fairly large daily files stored in a blog storage engine(S3, Azure datalake exc.. exc..) data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv. My goal is to preform a rolling N-day linear regression but I am having trouble with the data loading aspect. I am not sure how to do this without nested RDD's. The schema for every .csv file is the same.

In other words for every date d_t, I need data x_t and to join data (x_t-1, x_t-2,... x_t-N).

How can I use PySpark to load an N-day Window of these daily files? All of the PySpark examples I can find seem to load from one very large file or data set.

Here's an example of my current code:

dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]  p = sc.parallelize(dates) def test_run(date_range):     dt0 = date_range[-1] #get the latest date     s = '/daily/data{}.csv'     df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')     file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates     df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')     return 1  p.filter(test_run)   p.map(test_run) #fails with same error as p.filter 

I'm on PySpark version '2.1.0'

I'm running this on an Azure HDInsight cluster jupyter notebook.

spark here is of type <class 'pyspark.sql.session.SparkSession'>

A smaller more reproducible example is as follows:

p = sc.parallelize([1, 2, 3]) def foo(date_range):     df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])     return 1 p.filter(foo).count() 

1 Answers

Answers 1

You are better off with using Dataframes rather than RDD. Dataframe's read.csv api accepts list of paths like -

pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv'] df = spark.read.csv(pathList) 

have a look at documentation for read.csv

You can form the list of paths to date files to your data files by doing some date operation over window of N days like "path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv" (This will get you file name of today only but its not hard to figure out date calculation for N days)

However keep in mind that schema of all date csvs should be same for above to work.

edit : When you parallelize list of dates i.e. p, each date gets processed individually by different executors, so input to test_run2 wasnt really as list of dates, it was one individual string like 1995-01-01

Try this instead, see if this works.

# Get the list of dates  date_range = window(dates, N)  s = '/daily/data{}.csv'  dt0 = date_range[-1] # most recent file df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')   # read previous files file_list = [s.format(dt) for dt in date_range[:-1]] df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')  r, resid = computeLinearRegression(df0,df1) r.write.save('daily/r{}.csv'.format(dt0)) resid.write.save('/daily/resid{}.csv'.format(dt0)) 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment