Sunday, September 30, 2018

Python lagged series to Pyspark

Leave a Comment

I am trying to do adapt this Python code in pyspark:

from statsmodels.tsa.tsatools import lagmat  def lag_func(data,lag):     lag = lag     X = lagmat(data["diff"], lag)     lagged = data.copy()     for c in range(1,lag+1):         lagged["lag%d" % c] = X[:, c-1]     return lagged  def diff_creation(data):     data["diff"] = np.nan     data.ix[1:, "diff"] = (data.iloc[1:, 1].as_matrix() - data.iloc[:len(data)-1, 1].as_matrix())     return data 

The result is a dataframe with lagged columns.

I tried something like that:

class SerieMaker(Transformer):     def __init__(self, inputCol='f_qty_recalc', outputCol='serie', dateCol='dt_ticket_sale', idCol= ['id_store', 'id_sku'], serieSize=30):         self.inputCol = inputCol         self.outputCol = outputCol         self.dateCol = dateCol         self.serieSize = serieSize         self.idCol = idCol      def _transform(self, df):         window = Window.partitionBy(self.idCol).orderBy(self.dateCol)         series = []             df = df.withColumn('filled_serie', F.lit(0))          """ 30 days lag"""          for index in reversed(range(0, self.serieSize)):             window2 = Window.partitionBy(self.idCol).orderBy(self.dateCol).rowsBetween((self.serieSize - index), self.serieSize)             col_name = (self.outputCol + '%s' % index)             series.append(col_name)             df = df.withColumn(col_name, F.when(F.isnull(F.lag(F.col(self.inputCol), index).over(window)),                                                  F.first(F.col(self.inputCol),                                                          ignorenulls=True).over(window2)).otherwise(F.lag(F.col(self.inputCol),                                                                                                           index).over(window)))             df = df.withColumn('filled_serie', F.when(F.isnull(F.lag(F.col(self.inputCol), index).over(window)),                                                        (F.col('filled_serie') + 1)).otherwise(F.col('filled_serie')))             df = df.withColumn('rank', F.rank().over(window))             return df.withColumn(self.outputCol, F.col(*series)) 

My df looks like:

  id_sku|id_store|     dt_ticket_sale|f_qty_recalc|prc_sku|sales| +------------+--------+-------------------+------------+-------+-----+ |    514655.0|    1090|2017-12-20 00:00:00|           1|   1.23| 1.23| |    823259.0|     384|2017-12-20 00:00:00|           1|   2.79| 2.79| 

My expected output is some lag of fqty_recalc and at the beginning idsku idstore and date (not shown there):

    diff    lag1    lag2    lag3    lag4    lag5    lag6    lag7    lag8    lag9    ... lag20   lag21   lag22   lag23   lag24   lag25   lag26   lag27   lag28   lag29 0   NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1   0.0 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 

0 Answers

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment