Monday, January 22, 2018

Separate computation from socket work in Python

Leave a Comment

I'm serializing column data and then sending it over a socket connection. Something like:

import array, struct, socket  ## Socket setup s = socket.create_connection((ip, addr))  ## Data container setup ordered_col_list = ('col1', 'col2') columns = dict.fromkeys(ordered_col_list)  for i in range(num_of_chunks):     ## Binarize data     columns['col1'] = array.array('i', range(10000))     columns['col2'] = array.array('f', [float(num) for num in range(10000)])     .     .     .      ## Send away     chunk = b''.join(columns[col_name] for col_name in ordered_col_list]     s.sendall(chunk)     s.recv(1000)      #get confirmation 

I wish to separate the computation from the sending, put them on separate threads or processes, so I can keep doing computations while data is sent away.

I've put the binarizing part as a generator function, then sent the generator to a separate thread, which then yielded binary chunks via a queue.

I collected the data from the main thread and sent it away. Something like:

import array, struct, socket from time import sleep try:     import  thread     from Queue import Queue except:     import _thread as thread     from queue import Queue   ## Socket and queue setup s = socket.create_connection((ip, addr)) chunk_queue = Queue()   def binarize(num_of_chunks):     ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''      ordered_col_list = ('col1', 'col2')     columns = dict.fromkeys(ordered_col_list)      for i in range(num_of_chunks):         columns['col1'] = array.array('i', range(10000)).tostring()         columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()         .         .          yield b''.join((columns[col_name] for col_name in ordered_col_list))   def chunk_yielder(queue):     ''' Generate binary chunks and put them on a queue. To be used from a thread '''      while True:            try:             data_gen = queue.get_nowait()         except:             sleep(0.1)             continue         else:                 for chunk in data_gen:                 queue.put(chunk)   ## Setup thread and data generator thread.start_new_thread(chunk_yielder, (chunk_queue,)) num_of_chunks = 100 data_gen = binarize(num_of_chunks) queue.put(data_gen)   ## Get data back and send away while True:    try:         binary_chunk = queue.get_nowait()     except:         sleep(0.1)         continue     else:             socket.sendall(binary_chunk)         socket.recv(1000) #Get confirmation 

However, I did not see and performance imporovement - it did not work faster.

I don't understand threads/processes too well, and my question is whether it is possible (at all and in Python) to gain from this type of separation, and what would be a good way to go about it, either with threads or processess (or any other way - async etc).

2 Answers

Answers 1

If you are trying to use concurrency to improve performance in CPython I would strongly recommend using multiprocessing library instead of multithreading. It is because of GIL (Global Interpreter Lock), which can have a huge impact on execution speed (in some cases, it may cause your code to run slower than single threaded version). Also, if you would like to learn more about this topic, I recommend reading this presentation by David Beazley. Multiprocessing bypasses this problem by spawning a new Python interpreter instance for each process, thus allowing you to take full advantage of multi core architecture.

Answers 2

You have two options for running things in parallel in Python, either use the multiprocessing (docs) library , or write the parallel code in cython and release the GIL. The latter is significantly more work and less applicable generally speaking.

Python threads are limited by the Global Interpreter Lock (GIL), I won't go into detail here as you will find more than enough information online on it. In short, the GIL, as the name suggests, is a global lock within the CPython interpreter that ensures multiple threads do not modify objects, that are within the confines of said interpreter, simultaneously. This is why, for instance, cython programs can run code in parallel because they can exist outside the GIL.


As to your code, one problem is that you're running both the number crunching (binarize) and the socket.send inside the GIL, this will run them strictly serially. The queue is also connected very strangely, and there is a NameError but let's leave those aside.

With the caveats already pointed out by Jeremy Friesner in mind, I suggest you re-structure the code in the following manner: you have two processes (not threads) one for binarising the data and the other for sending data. In addition to those, there is also the parent process that started both children, and a queue connecting child 1 to child 2.

  • Subprocess-1 does number crunching and produces crunched data into a queue
  • Subprocess-2 consumes data from a queue and does socket.send

in code the setup would look something like

from multiprocessing import Process, Queue  work_queue = Queue() p1 = Process(target=binarize, args=(100, work_queue)) p2 = Process(target=send_data, args=(ip, port, work_queue)) p1.start() p2.start() p1.join() p2.join() 

binarize can remain as it is in your code, with the exception that instead of a yield at the end, you add elements into the queue

def binarize(num_of_chunks, q):     ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''      ordered_col_list = ('col1', 'col2')     columns = dict.fromkeys(ordered_col_list)     for i in range(num_of_chunks):         columns['col1'] = array.array('i', range(10000)).tostring()         columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()         data = b''.join((columns[col_name] for col_name in ordered_col_list))         q.put(data) 

send_data should just be the while loop from the bottom of your code, with the connection open/close functionality

def send_data(ip, addr, q):      s = socket.create_connection((ip, addr))      while True:          try:              binary_chunk = q.get(False)          except:              sleep(0.1)              continue          else:                  socket.sendall(binary_chunk)              socket.recv(1000) # Get confirmation     # maybe remember to close the socket before killing the process 

Now you have two (three actually if you count the parent) processes that are processing data independently. You can force the two processes to synchronise their operations by setting the max_size of the queue to a single element. The operation of these two separate processes is also easy to monitor from the process manager on your computer top (Linux), Activity Monitor (OsX), don't remember what it's called under Windows.


Finally, Python 3 comes with the option of using co-routines which are neither processes nor threads, but something else entirely. Co-routines are pretty cool from a CS point of view, but a bit of a head scratcher at first. There is plenty of resources to learn from though, like this post on Medium and this talk by David Beazley.


Even more generally, you might want to look into the producer/consumer pattern, if you are not already familiar with it.

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment