I have a PySpark job that updates some objects in HBase (Spark v1.6.0; happybase v0.9).
It sort-of works if I open/close an HBase connection for each row:
def process_row(row): conn = happybase.Connection(host=[hbase_master]) # update HBase record with data from row conn.close() my_dataframe.foreach(process_row)
After a few thousand upserts, we start to see errors like this:
TTransportException: Could not connect to [hbase_master]:9090
Obviously, it's inefficient to open/close a connection for each upsert. This function is really just a placeholder for a proper solution.
I then tried to create a version of the process_row
function that uses a connection pool:
pool = happybase.ConnectionPool(size=20, host=[hbase_master]) def process_row(row): with pool.connection() as conn: # update HBase record with data from row
For some reason, the connection pool version of this function returns an error (see complete error message):
TypeError: can't pickle thread.lock objects
Can you see what I'm doing wrong?
Update
I saw this post and suspect I'm experiencing the same issue: Spark attempts to serialize the pool
object and distribute it to each of the executors, but this connection pool object cannot be shared across multiple executors.
It sounds like I need to split the dataset into partitions, and use one connection per partition (see design patterns for using foreachrdd). I tried this, based on an example in the documentation:
def persist_to_hbase(dataframe_partition): hbase_connection = happybase.Connection(host=[hbase_master]) for row in dataframe_partition: # persist data hbase_connection.close() my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))
Unfortunately, it still returns a "can't pickle thread.lock objects" error.
0 comments:
Post a Comment