Monday, January 29, 2018

multiprocessing.Pipe is even slower than multiprocessing.Queue?

Leave a Comment

I tried to benchmark the speed up of Pipe over Queue from the multiprocessing package. T thought Pipe would be faster as Queue uses Pipe internally.

Strangely, Pipe is slower than Queue when sending large numpy array. What am I missing here?

Pipe:

import sys import time from multiprocessing import Process, Pipe import numpy as np  NUM = 1000   def worker(conn):     for task_nbr in range(NUM):         conn.send(np.random.rand(400, 400, 3))     sys.exit(1)   def main():     parent_conn, child_conn = Pipe(duplex=False)     Process(target=worker, args=(child_conn,)).start()     for num in range(NUM):         message = parent_conn.recv()   if __name__ == "__main__":     start_time = time.time()     main()     end_time = time.time()     duration = end_time - start_time     msg_per_sec = NUM / duration      print "Duration: %s" % duration     print "Messages Per Second: %s" % msg_per_sec  # Took 10.86s. 

Queue

import sys import time from multiprocessing import Process from multiprocessing import Queue import numpy as np  NUM = 1000  def worker(q):     for task_nbr in range(NUM):         q.put(np.random.rand(400, 400, 3))     sys.exit(1)  def main():     recv_q = Queue()     Process(target=worker, args=(recv_q,)).start()     for num in range(NUM):         message = recv_q.get()  if __name__ == "__main__":     start_time = time.time()     main()     end_time = time.time()     duration = end_time - start_time     msg_per_sec = NUM / duration      print "Duration: %s" % duration     print "Messages Per Second: %s" % msg_per_sec  # Took 6.86s. 

2 Answers

Answers 1

You can do an experiment and put the following into your Pipe code above..

def worker(conn):     for task_nbr in range(NUM):         data = np.random.rand(400, 400, 3)     sys.exit(1)  def main():     parent_conn, child_conn = Pipe(duplex=False)     p = Process(target=worker, args=(child_conn,))     p.start()     p.join() 

This gives you the time that it takes to create the data for your test. On my system this takes about 2.9 seconds.

Under the hood the queue object implements a buffer and a threaded send. The thread is still in the same process but by using it, the data creation doesn't have to wait for the system IO to complete. It effectively parallelizes the operations. Try your Pipe code modified with some simple threading implemented (disclaimer, code here is for test only and is not production ready)..

import sys import time import threading from multiprocessing import Process, Pipe, Lock import numpy as np import copy  NUM = 1000  def worker(conn):     _conn = conn     _buf = []     _wlock = Lock()     _sentinel = object() # signal that we're done     def thread_worker():         while 1:             if _buf:                 _wlock.acquire()                 obj = _buf.pop(0)                 if obj is _sentinel: return                 _conn.send(data)                 _wlock.release()     t = threading.Thread(target=thread_worker)     t.start()     for task_nbr in range(NUM):         data = np.random.rand(400, 400, 3)         data[0][0][0] = task_nbr    # just for integrity check         _wlock.acquire()         _buf.append(data)         _wlock.release()     _wlock.acquire()     _buf.append(_sentinel)     _wlock.release()     t.join()     sys.exit(1)  def main():     parent_conn, child_conn = Pipe(duplex=False)     Process(target=worker, args=(child_conn,)).start()     for num in range(NUM):         message = parent_conn.recv()         assert num == message[0][0][0], 'Data was corrupted'          if __name__ == "__main__":     start_time = time.time()     main()     end_time = time.time()     duration = end_time - start_time     msg_per_sec = NUM / duration      print "Duration: %s" % duration     print "Messages Per Second: %s" % msg_per_sec 

On my machine this takes 3.4 seconds to run which is almost exactly the same as your Queue code above.

From https://docs.python.org/2/library/threading.html

In Cython, due to due to the Global Interpreter Lock, only one thread can execute Python code at once... however, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

The queue and pipe differences are definitely an odd implementation detail until you dig into it a bit.

Answers 2

I assume by your print command you are using Python2. However the strange behavior cannot be replicated with Python3, where Pipe is actually faster than Queue.

import sys import time from multiprocessing import Process, Pipe, Queue import numpy as np  NUM = 20000   def worker_pipe(conn):     for task_nbr in range(NUM):         conn.send(np.random.rand(40, 40, 3))     sys.exit(1)   def main_pipe():     parent_conn, child_conn = Pipe(duplex=False)     Process(target=worker_pipe, args=(child_conn,)).start()     for num in range(NUM):         message = parent_conn.recv()   def pipe_test():     start_time = time.time()     main_pipe()     end_time = time.time()     duration = end_time - start_time     msg_per_sec = NUM / duration     print("Pipe")     print("Duration: " + str(duration))     print("Messages Per Second: " + str(msg_per_sec))  def worker_queue(q):     for task_nbr in range(NUM):         q.put(np.random.rand(40, 40, 3))     sys.exit(1)  def main_queue():     recv_q = Queue()     Process(target=worker_queue, args=(recv_q,)).start()     for num in range(NUM):         message = recv_q.get()  def queue_test():     start_time = time.time()     main_queue()     end_time = time.time()     duration = end_time - start_time     msg_per_sec = NUM / duration     print("Queue")     print("Duration: " + str(duration))     print("Messages Per Second: " + str(msg_per_sec))   if __name__ == "__main__":     for i in range(2):         queue_test()         pipe_test() 

Results in:

Queue Duration: 3.44321894646 Messages Per Second: 5808.51822408 Pipe Duration: 2.69065594673 Messages Per Second: 7433.13169575 Queue Duration: 3.45295906067 Messages Per Second: 5792.13354361 Pipe Duration: 2.78426194191 Messages Per Second: 7183.23218766   ------------------ (program exited with code: 0) Press return to continue 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment