I have a pyspark to load data from a TSV file and save it as parquet file as well save it as a persistent SQL table.
When I run it line by line through pyspark CLI, it works exactly like expected. When I run it as as an application using spark-submit it runs without any errors but I get strange results: 1. the data is overwritten instead of appended. 2. When I run SQL queries against it I get no data returned even though the parquet files are several gigabytes in size (what I expect). Any suggestions?
Code:
from pyspark import SparkContext, SparkConf from pyspark.sql.types import * from pyspark.sql.functions import * csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv' parquet_dir = '/srv/spark/data/parquet/ipfixminute' sc = SparkContext(appName='import-ipfixminute') spark = SQLContext(sc) fields = [StructField('time_stamp', TimestampType(), True), StructField('subscriberId', StringType(), True), StructField('sourceIPv4Address', StringType(), True), StructField('destinationIPv4Address', StringType(), True), StructField('service',StringType(), True), StructField('baseService',StringType(), True), StructField('serverHostname', StringType(), True), StructField('rat', StringType(), True), StructField('userAgent', StringType(), True), StructField('accessPoint', StringType(), True), StructField('station', StringType(), True), StructField('device', StringType(), True), StructField('contentCategories', StringType(), True), StructField('incomingOctets', LongType(), True), StructField('outgoingOctets', LongType(), True), StructField('incomingShapingDrops', IntegerType(), True), StructField('outgoingShapingDrops', IntegerType(), True), StructField('qoeIncomingInternal', DoubleType(), True), StructField('qoeIncomingExternal', DoubleType(), True), StructField('qoeOutgoingInternal', DoubleType(), True), StructField('qoeOutgoingExternal', DoubleType(), True), StructField('incomingShapingLatency', DoubleType(), True), StructField('outgoingShapingLatency', DoubleType(), True), StructField('internalRtt', DoubleType(), True), StructField('externalRtt', DoubleType(), True), StructField('HttpUrl',StringType(), True)] schema = StructType(fields) df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss') df = df.drop('all') df = df.withColumn('date',to_date('time_stamp')) df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)
1 Answers
Answers 1
As @user8371915 suggested it is similar to this:
Spark can access Hive table from pyspark but not from spark-submit
I needed to replace
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
with
from pyspark.sql import HiveContext sqlContext = HiveContext(sc)
This resolved this issue.
0 comments:
Post a Comment