I tried to benchmark the speed up of Pipe
over Queue
from the multiprocessing
package. T thought Pipe
would be faster as Queue
uses Pipe
internally.
Strangely, Pipe
is slower than Queue
when sending large numpy array. What am I missing here?
Pipe:
import sys import time from multiprocessing import Process, Pipe import numpy as np NUM = 1000 def worker(conn): for task_nbr in range(NUM): conn.send(np.random.rand(400, 400, 3)) sys.exit(1) def main(): parent_conn, child_conn = Pipe(duplex=False) Process(target=worker, args=(child_conn,)).start() for num in range(NUM): message = parent_conn.recv() if __name__ == "__main__": start_time = time.time() main() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print "Duration: %s" % duration print "Messages Per Second: %s" % msg_per_sec # Took 10.86s.
Queue
import sys import time from multiprocessing import Process from multiprocessing import Queue import numpy as np NUM = 1000 def worker(q): for task_nbr in range(NUM): q.put(np.random.rand(400, 400, 3)) sys.exit(1) def main(): recv_q = Queue() Process(target=worker, args=(recv_q,)).start() for num in range(NUM): message = recv_q.get() if __name__ == "__main__": start_time = time.time() main() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print "Duration: %s" % duration print "Messages Per Second: %s" % msg_per_sec # Took 6.86s.
2 Answers
Answers 1
You can do an experiment and put the following into your Pipe code above..
def worker(conn): for task_nbr in range(NUM): data = np.random.rand(400, 400, 3) sys.exit(1) def main(): parent_conn, child_conn = Pipe(duplex=False) p = Process(target=worker, args=(child_conn,)) p.start() p.join()
This gives you the time that it takes to create the data for your test. On my system this takes about 2.9 seconds.
Under the hood the queue
object implements a buffer and a threaded send. The thread is still in the same process but by using it, the data creation doesn't have to wait for the system IO to complete. It effectively parallelizes the operations. Try your Pipe code modified with some simple threading implemented (disclaimer, code here is for test only and is not production ready)..
import sys import time import threading from multiprocessing import Process, Pipe, Lock import numpy as np import copy NUM = 1000 def worker(conn): _conn = conn _buf = [] _wlock = Lock() _sentinel = object() # signal that we're done def thread_worker(): while 1: if _buf: _wlock.acquire() obj = _buf.pop(0) if obj is _sentinel: return _conn.send(data) _wlock.release() t = threading.Thread(target=thread_worker) t.start() for task_nbr in range(NUM): data = np.random.rand(400, 400, 3) data[0][0][0] = task_nbr # just for integrity check _wlock.acquire() _buf.append(data) _wlock.release() _wlock.acquire() _buf.append(_sentinel) _wlock.release() t.join() sys.exit(1) def main(): parent_conn, child_conn = Pipe(duplex=False) Process(target=worker, args=(child_conn,)).start() for num in range(NUM): message = parent_conn.recv() assert num == message[0][0][0], 'Data was corrupted' if __name__ == "__main__": start_time = time.time() main() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print "Duration: %s" % duration print "Messages Per Second: %s" % msg_per_sec
On my machine this takes 3.4 seconds to run which is almost exactly the same as your Queue code above.
From https://docs.python.org/2/library/threading.html
In Cython, due to due to the Global Interpreter Lock, only one thread can execute Python code at once... however, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
The queue
and pipe
differences are definitely an odd implementation detail until you dig into it a bit.
Answers 2
I assume by your print command you are using Python2. However the strange behavior cannot be replicated with Python3, where Pipe
is actually faster than Queue
.
import sys import time from multiprocessing import Process, Pipe, Queue import numpy as np NUM = 20000 def worker_pipe(conn): for task_nbr in range(NUM): conn.send(np.random.rand(40, 40, 3)) sys.exit(1) def main_pipe(): parent_conn, child_conn = Pipe(duplex=False) Process(target=worker_pipe, args=(child_conn,)).start() for num in range(NUM): message = parent_conn.recv() def pipe_test(): start_time = time.time() main_pipe() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print("Pipe") print("Duration: " + str(duration)) print("Messages Per Second: " + str(msg_per_sec)) def worker_queue(q): for task_nbr in range(NUM): q.put(np.random.rand(40, 40, 3)) sys.exit(1) def main_queue(): recv_q = Queue() Process(target=worker_queue, args=(recv_q,)).start() for num in range(NUM): message = recv_q.get() def queue_test(): start_time = time.time() main_queue() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print("Queue") print("Duration: " + str(duration)) print("Messages Per Second: " + str(msg_per_sec)) if __name__ == "__main__": for i in range(2): queue_test() pipe_test()
Results in:
Queue Duration: 3.44321894646 Messages Per Second: 5808.51822408 Pipe Duration: 2.69065594673 Messages Per Second: 7433.13169575 Queue Duration: 3.45295906067 Messages Per Second: 5792.13354361 Pipe Duration: 2.78426194191 Messages Per Second: 7183.23218766 ------------------ (program exited with code: 0) Press return to continue
0 comments:
Post a Comment