I am trying to do adapt this Python code in pyspark:
from statsmodels.tsa.tsatools import lagmat def lag_func(data,lag): lag = lag X = lagmat(data["diff"], lag) lagged = data.copy() for c in range(1,lag+1): lagged["lag%d" % c] = X[:, c-1] return lagged def diff_creation(data): data["diff"] = np.nan data.ix[1:, "diff"] = (data.iloc[1:, 1].as_matrix() - data.iloc[:len(data)-1, 1].as_matrix()) return data
The result is a dataframe with lagged columns.
I tried something like that:
class SerieMaker(Transformer): def __init__(self, inputCol='f_qty_recalc', outputCol='serie', dateCol='dt_ticket_sale', idCol= ['id_store', 'id_sku'], serieSize=30): self.inputCol = inputCol self.outputCol = outputCol self.dateCol = dateCol self.serieSize = serieSize self.idCol = idCol def _transform(self, df): window = Window.partitionBy(self.idCol).orderBy(self.dateCol) series = [] df = df.withColumn('filled_serie', F.lit(0)) """ 30 days lag""" for index in reversed(range(0, self.serieSize)): window2 = Window.partitionBy(self.idCol).orderBy(self.dateCol).rowsBetween((self.serieSize - index), self.serieSize) col_name = (self.outputCol + '%s' % index) series.append(col_name) df = df.withColumn(col_name, F.when(F.isnull(F.lag(F.col(self.inputCol), index).over(window)), F.first(F.col(self.inputCol), ignorenulls=True).over(window2)).otherwise(F.lag(F.col(self.inputCol), index).over(window))) df = df.withColumn('filled_serie', F.when(F.isnull(F.lag(F.col(self.inputCol), index).over(window)), (F.col('filled_serie') + 1)).otherwise(F.col('filled_serie'))) df = df.withColumn('rank', F.rank().over(window)) return df.withColumn(self.outputCol, F.col(*series))
My df looks like:
id_sku|id_store| dt_ticket_sale|f_qty_recalc|prc_sku|sales| +------------+--------+-------------------+------------+-------+-----+ | 514655.0| 1090|2017-12-20 00:00:00| 1| 1.23| 1.23| | 823259.0| 384|2017-12-20 00:00:00| 1| 2.79| 2.79|
My expected output is some lag of fqty_recalc
and at the beginning idsku idstore and date (not shown there):
diff lag1 lag2 lag3 lag4 lag5 lag6 lag7 lag8 lag9 ... lag20 lag21 lag22 lag23 lag24 lag25 lag26 lag27 lag28 lag29 0 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1 0.0 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
0 comments:
Post a Comment