Thursday, May 31, 2018

Pyspark: spark-submit not working like CLI

Leave a Comment

I have a pyspark to load data from a TSV file and save it as parquet file as well save it as a persistent SQL table.

When I run it line by line through pyspark CLI, it works exactly like expected. When I run it as as an application using spark-submit it runs without any errors but I get strange results: 1. the data is overwritten instead of appended. 2. When I run SQL queries against it I get no data returned even though the parquet files are several gigabytes in size (what I expect). Any suggestions?

Code:

from pyspark import SparkContext, SparkConf from pyspark.sql.types import * from pyspark.sql.functions import *  csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv' parquet_dir = '/srv/spark/data/parquet/ipfixminute'  sc = SparkContext(appName='import-ipfixminute') spark = SQLContext(sc)  fields = [StructField('time_stamp', TimestampType(), True),                 StructField('subscriberId', StringType(), True),                 StructField('sourceIPv4Address', StringType(), True),                 StructField('destinationIPv4Address', StringType(), True),                 StructField('service',StringType(), True),                 StructField('baseService',StringType(), True),                 StructField('serverHostname', StringType(), True),                 StructField('rat', StringType(), True),                 StructField('userAgent', StringType(), True),                 StructField('accessPoint', StringType(), True),                 StructField('station', StringType(), True),                 StructField('device', StringType(), True),                 StructField('contentCategories', StringType(), True),                 StructField('incomingOctets', LongType(), True),                 StructField('outgoingOctets', LongType(), True),                 StructField('incomingShapingDrops', IntegerType(), True),                 StructField('outgoingShapingDrops', IntegerType(), True),                 StructField('qoeIncomingInternal', DoubleType(), True),                 StructField('qoeIncomingExternal', DoubleType(), True),                 StructField('qoeOutgoingInternal', DoubleType(), True),                 StructField('qoeOutgoingExternal', DoubleType(), True),                 StructField('incomingShapingLatency', DoubleType(), True),                 StructField('outgoingShapingLatency', DoubleType(), True),                 StructField('internalRtt', DoubleType(), True),                 StructField('externalRtt', DoubleType(), True),                 StructField('HttpUrl',StringType(), True)]  schema = StructType(fields) df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss') df = df.drop('all') df = df.withColumn('date',to_date('time_stamp')) df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir) 

1 Answers

Answers 1

As @user8371915 suggested it is similar to this:

Spark can access Hive table from pyspark but not from spark-submit

I needed to replace

from pyspark.sql import SQLContext  sqlContext = SQLContext(sc) 

with

from pyspark.sql import HiveContext  sqlContext = HiveContext(sc) 

This resolved this issue.

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment