I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried
df.withColumn('words', words_rdd )
but I get the error
AssertionError: col should be Column
The DataFrame looks something like this
Articles the cat and dog ran we went to the park today it will rain
but I have 3k news articles.
I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:
[[cat, dog, ran],[we, went, park],[today, will, rain]]
I'm trying to get my Dataframe to look like this:
Articles Words the cat and dog ran [cat, dog, ran] we went to the park [we, went, park] today it will rain [today, will, rain]
4 Answers
Answers 1
Disclaimer:
Spark DataFrame
in general has no strictly defined order. Use at your own risk.
Add index to existing DataFrame
:
from pyspark.sql.types import * df_index = spark.createDataFrame( df.rdd.zipWithIndex(), StructType([StructField("data", df.schema), StructField("id", LongType())]) )
Add index to RDD
and convert to DataFrame
:
words_df = spark.createDataFrame( words_rdd.zipWithIndex(), StructType([ StructField("words", ArrayType(StringType())), StructField("id", LongType()) ]) )
Join both and select required fields:
df_index.join(words_df, "id").select("data.*", "words")
Caution
There are different solutions, which might work in specific cases, but don't guarantee performance and or correctness. These include:
- Using
monotonically_increasing_id
as ajoin
key - in general case not correct. - Using
row_number()
window function as a join key - unacceptable performance implication and in general not correct if there is no specific order defined. - Using
zip
onRDDs
- can work if and only if both structures have the same data distribution (should work in this case).
Note:
In this specific case you shouldn't need RDD
. pyspark.ml.feature
provides a variety of Transformers
, which should work well for you.
from pyspark.ml.feature import * from pyspark.ml import Pipeline df = spark.createDataFrame( ["the cat and dog ran", "we went to the park", "today it will rain"], "string" ).toDF("Articles") Pipeline(stages=[ RegexTokenizer(inputCol="Articles", outputCol="Tokens"), StopWordsRemover(inputCol="Tokens", outputCol="Words") ]).fit(df).transform(df).show() # +-------------------+--------------------+---------------+ # | Articles| Tokens| Words| # +-------------------+--------------------+---------------+ # |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]| # |we went to the park|[we, went, to, th...| [went, park]| # | today it will rain|[today, it, will,...| [today, rain]| # +-------------------+--------------------+---------------+
The list of stop words can be provided using stopWords
parameter of the StopWordsRemover
, for example:
StopWordsRemover( inputCol="Tokens", outputCol="Words", stopWords=["the", "and", "we", "to", "it"] )
Answers 2
Why do you want to join the rdd back to the dataframe, I would rather create a new column from "Articles" directly. There are multiple ways to do it, here are my 5 cents:
from pyspark.sql import Row from pyspark.sql.context import SQLContext sqlCtx = SQLContext(sc) # sc is the sparkcontext x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')] df = sqlCtx.createDataFrame(x) df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words']) df2.show()
You get the following output:
Articles words the cat and dog ran [the, cat, and, dog, ran] we went to the park [we, went, to, the, park] today it will rain [today, it, will, rain]
Let me know if you were looking to achieve something else.
Answers 3
A simple approach but effective would be to use udf. You can:
from pyspark.sql.functions import udf from pyspark.sql.types import StringType df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None], "string" ).toDF("Articles") split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType()) df = df.withColumn('Words', split_words(df['Articles'])) df.show(10,False) >> +-------------------+-------------------------+ |Articles |Words | +-------------------+-------------------------+ |the cat and dog ran|[the, cat, and, dog, ran]| |we went to the park|[we, went, to, the, park]| |today it will rain |[today, it, will, rain] | |null |null | +-------------------+-------------------------+
I added check for None because it very usual to have in your data bad lines. You can drop them easily after splitting or before ,with dropna.
But in my opinion if you want to do this as a preparation task for Text analytics it would be probably to your best interest to build a Pipeline as @user9613318 suggests in his answer
Answers 4
rdd1 = spark.sparkContext.parallelize([1, 2, 3, 5]) # make some transformation on rdd1: rdd2 = rdd.map(lambda n: True if n % 2 else False) # Append each row in rdd2 to those in rdd1. rdd1.zip(rdd2).collect()
0 comments:
Post a Comment