I met problem with processing of spark wide dataframe (about 9000 columns and sometimes more).
Task:
- Create wide DF via groupBy and pivot.
- Transform columns to vector and processing in to KMeans from pyspark.ml.
So I made extensive frame and try to create vector with VectorAssembler, cached it and trained on it KMeans.
It took about 11 minutes for assembling and 2 minutes for KMeans for 7 different count of clusters on my pc in standalone mode for frame ~500x9000. Another side this processing in pandas (pivot df, and iterate 7 clusters) takes less one minute.
Obviously I understand overhead and performance decreasing for standalone mode and caching and so on but it's really discourages me.
Could somebody explain how I can avoid this overhead?
How peoples work with wide DF instead of using vectorassembler and getting performance decreasing?
More formal question (for sof rules) sound like - How can I speed up this code?
%%time tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue') .groupBy('User') .pivot('ObjectPath') .agg({'PropertyFlagValue':'max'}) .fillna(0)) ignore = ['User'] assembler = VectorAssembler( inputCols=[x for x in tmp.columns if x not in ignore], outputCol='features') Wall time: 36.7 s print(tmp.count(), len(tmp.columns)) 552, 9378 %%time transformed = assembler.transform(tmp).select('User', 'features').cache() Wall time: 10min 45s %%time lst_levels = [] for num in range(3, 14): kmeans = KMeans(k=num, maxIter=50) model = kmeans.fit(transformed) lst_levels.append(model.computeCost(transformed)) rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))] for i, j in zip(rs, rs[1:]): if i - j < j: print(rs.index(i)) kmeans = KMeans(k=rs.index(i) + 3, maxIter=50) model = kmeans.fit(transformed) break Wall time: 1min 32s
Config:
.config("spark.sql.pivotMaxValues", "100000") \ .config("spark.sql.autoBroadcastJoinThreshold", "-1") \ .config("spark.sql.shuffle.partitions", "4") \ .config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \
1 Answers
Answers 1
This solution is perhaps unsatisfying, but it's true: avoid using VectorAssembler with wide data. It doesn't seem built for that use case. Try to run an equivalent computation on a long format instead.
VectorAssembler adds in a lot of info about the column types as it evaluates the data structure.
So now you have two problems. First, as just mentioned, initial processing time blows up. Second, you've introduced overhead in performing transformations because your transforming much larger monster than the original data set. See the transform function in the link to see what I'm talking about.
0 comments:
Post a Comment