Friday, May 18, 2018

Add PySpark RDD as new column to pyspark.sql.dataframe

Leave a Comment

I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried

df.withColumn('words', words_rdd ) 

but I get the error

AssertionError: col should be Column 

The DataFrame looks something like this

Articles the cat and dog ran we went to the park today it will rain 

but I have 3k news articles.

I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:

[[cat, dog, ran],[we, went, park],[today, will, rain]] 

I'm trying to get my Dataframe to look like this:

Articles                 Words the cat and dog ran      [cat, dog, ran] we went to the park      [we, went, park] today it will rain       [today, will, rain] 

4 Answers

Answers 1

Disclaimer:

Spark DataFrame in general has no strictly defined order. Use at your own risk.

Add index to existing DataFrame:

from pyspark.sql.types import *  df_index = spark.createDataFrame(     df.rdd.zipWithIndex(),     StructType([StructField("data", df.schema), StructField("id", LongType())]) ) 

Add index to RDD and convert to DataFrame:

words_df = spark.createDataFrame(     words_rdd.zipWithIndex(),     StructType([         StructField("words", ArrayType(StringType())),         StructField("id", LongType())     ]) ) 

Join both and select required fields:

df_index.join(words_df, "id").select("data.*", "words") 

Caution

There are different solutions, which might work in specific cases, but don't guarantee performance and or correctness. These include:

  • Using monotonically_increasing_id as a join key - in general case not correct.
  • Using row_number() window function as a join key - unacceptable performance implication and in general not correct if there is no specific order defined.
  • Using zip on RDDs - can work if and only if both structures have the same data distribution (should work in this case).

Note:

In this specific case you shouldn't need RDD. pyspark.ml.feature provides a variety of Transformers, which should work well for you.

from pyspark.ml.feature import * from pyspark.ml import Pipeline  df = spark.createDataFrame(      ["the cat and dog ran", "we went to the park", "today it will rain"],          "string" ).toDF("Articles")  Pipeline(stages=[     RegexTokenizer(inputCol="Articles", outputCol="Tokens"),      StopWordsRemover(inputCol="Tokens", outputCol="Words") ]).fit(df).transform(df).show() # +-------------------+--------------------+---------------+ # |           Articles|              Tokens|          Words| # +-------------------+--------------------+---------------+ # |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]| # |we went to the park|[we, went, to, th...|   [went, park]| # | today it will rain|[today, it, will,...|  [today, rain]| # +-------------------+--------------------+---------------+ 

The list of stop words can be provided using stopWords parameter of the StopWordsRemover, for example:

StopWordsRemover(     inputCol="Tokens",     outputCol="Words",     stopWords=["the", "and", "we", "to", "it"] ) 

Answers 2

Why do you want to join the rdd back to the dataframe, I would rather create a new column from "Articles" directly. There are multiple ways to do it, here are my 5 cents:

from pyspark.sql import Row from pyspark.sql.context import SQLContext sqlCtx = SQLContext(sc)    # sc is the sparkcontext  x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')] df = sqlCtx.createDataFrame(x)  df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words']) df2.show() 

You get the following output:

Articles                 words the cat and dog ran      [the, cat, and, dog, ran] we went to the park      [we, went, to, the, park] today it will rain       [today, it, will, rain] 

Let me know if you were looking to achieve something else.

Answers 3

A simple approach but effective would be to use udf. You can:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType  df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None],  "string" ).toDF("Articles")  split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType()) df = df.withColumn('Words', split_words(df['Articles']))  df.show(10,False) >> +-------------------+-------------------------+ |Articles           |Words                    | +-------------------+-------------------------+ |the cat and dog ran|[the, cat, and, dog, ran]| |we went to the park|[we, went, to, the, park]| |today it will rain |[today, it, will, rain]  | |null               |null                     | +-------------------+-------------------------+ 

I added check for None because it very usual to have in your data bad lines. You can drop them easily after splitting or before ,with dropna.

But in my opinion if you want to do this as a preparation task for Text analytics it would be probably to your best interest to build a Pipeline as @user9613318 suggests in his answer

Answers 4

rdd1 = spark.sparkContext.parallelize([1, 2, 3, 5]) # make some transformation on rdd1: rdd2 = rdd.map(lambda n: True if n % 2 else False) # Append each row in rdd2 to those in rdd1. rdd1.zip(rdd2).collect() 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment