Recently, I’ve begun exploring the depths of ZeroMQ, and the pyzmq bindings. This post is not an introduction to ZeroMQ, but for a basic rundown the “0MQ in A Hundred Words” blurb from the project site suffices:
ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pubsub, task distribution, and request-reply. It’s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.
For more detail I highly recommend reading 0MQ, The Guide.
As I was going over the various python code examples at github, I became interested in the taskvent / tasksink / taskwork examples. The pattern was recognizable as one I often use for distributed processing. In the past, I typically would have implemented such a work flow using the python multiprocessing library, using it’s Queue class to communicate between processes. Recently I’ve implemented several data processing pipelines for work using the same technique, but using zeromq channels for communication, and I’ve been extremely pleased with both the performance, and the ease of use. So, I decided to write a short blog post with a simple example for others working on the same sorts of problems.
For the example, I’ve implemented a small distributed system that calculates the squares of a series of numbers, based on the python examples. The pieces are as follows:
ventilator
- The ventilator sends messages containing the numbers to be squared.
- Uses a ZMQ_PUSH socket to send messages to workers.
workers
- The workers receive messages from the ventilator, do the work, and send the results down the pipe.
- Uses a ZMQ_PUSH socket to send answers to the results manager.
- Uses a ZMQ_SUB socket to receive the FINISH message from the results manager.
results manager
- The results manager receives all answers from all workers, prints them, and sends a message to the workers to shut down when all tasks are complete.
- Uses a ZMQ_PULL socket to receive answers from the workers.
- Uses a ZMQ_PUB socket to send the FINISH message to the workers.
A diagram of the basic flow:
So, without further ado, let’s look at the code!
First, we import zmq, time, and the multiprocessing Process class:
import time import zmq from multiprocessing import Process
ventilator
def ventilator(): # Initialize a zeromq context context = zmq.Context() # Set up a channel to send work ventilator_send = context.socket(zmq.PUSH) ventilator_send.bind("tcp://127.0.0.1:5557") # Give everything a second to spin up and connect time.sleep(1) # Send the numbers between 1 and ten thousand as work messages for num in range(10000): work_message = { 'num' : num } ventilator_send.send_json(work_message) time.sleep(1)
worker
def worker(wrk_num): # Initialize a zeromq context context = zmq.Context() # Set up a channel to receive work from the ventilator work_receiver = context.socket(zmq.PULL) work_receiver.connect("tcp://127.0.0.1:5557") # Set up a channel to send result of work to the results reporter results_sender = context.socket(zmq.PUSH) results_sender.connect("tcp://127.0.0.1:5558") # Set up a channel to receive control messages over control_receiver = context.socket(zmq.SUB) control_receiver.connect("tcp://127.0.0.1:5559") control_receiver.setsockopt(zmq.SUBSCRIBE, "") # Set up a poller to multiplex the work receiver and control receiver channels poller = zmq.Poller() poller.register(work_receiver, zmq.POLLIN) poller.register(control_receiver, zmq.POLLIN) # Loop and accept messages from both channels, acting accordingly while True: socks = dict(poller.poll()) # If the message came from work_receiver channel, square the number # and send the answer to the results reporter if socks.get(work_receiver) == zmq.POLLIN: work_message = work_receiver.recv_json() product = work_message['num'] * work_message['num'] answer_message = { 'worker' : wrk_num, 'result' : product } results_sender.send_json(answer_message) # If the message came over the control channel, shut down the worker. if socks.get(control_receiver) == zmq.POLLIN: control_message = control_receiver.recv() if control_message == "FINISHED": print("Worker %i received FINSHED, quitting!" % wrk_num) break
results manager
def result_manager(): # Initialize a zeromq context context = zmq.Context() # Set up a channel to receive results results_receiver = context.socket(zmq.PULL) results_receiver.bind("tcp://127.0.0.1:5558") # Set up a channel to send control commands control_sender = context.socket(zmq.PUB) control_sender.bind("tcp://127.0.0.1:5559") for task_nbr in range(10000): result_message = results_receiver.recv_json() print "Worker %i answered: %i" % (result_message['worker'], result_message['result']) # Signal to all workers that we are finsihed control_sender.send("FINISHED") time.sleep(5)
And away we go…
if __name__ == "__main__": # Create a pool of workers to distribute work to worker_pool = range(10) for wrk_num in range(len(worker_pool)): Process(target=worker, args=(wrk_num,)).start() # Fire up our result manager... result_manager = Process(target=result_manager, args=()) result_manager.start() # Start the ventilator! ventilator = Process(target=ventilator, args=()) ventilator.start()
The code from this example is available at: https://github.com/taotetek/blog_examples
You may have noticed that I am using the tcp transport for communication between my processes. This is what I personally find the most exciting about ZeroMQ: if you use ZeroMQ for communication between processes in a multiprocess system, it is close to trivial to scale the code to run on multiple servers.
This only begins to scratch the surface of ZeroMQ. ZeroMQ supports multiple transports and many more topologies, and the library is available for multiple languages. This is a fantastic tool for both distributed computing, and polygot programming.
Pingback: links for 2011-02-04 « Bloggitation
FYI, the worker thread has lost its indentation and the break call results in an error.
The indentation in the Github file is correct, https://github.com/taotetek/blog_examples/blob/master/python_multiprocessing_with_zeromq/workqueue_example.py
Thanks for the heads up, I’ll fix the example on the site once I get a few spare minutes!
Hi Brian,
Running the code with Python 2.7 (32 and 64-bit), gevent 0.13.3, greenlet 0.3.1 and pyzmq 2.0.10.1 (although thats not important) results in the following error:
Traceback (most recent call last):
File “C:\Users\adam\workspace\python\ComPy\src\main.py”, line 125, in
result_manager.start()
File “C:\Program Files\Python27\lib\multiprocessing\process.py”, line 104, in start
self._popen = Popen(self)
File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 244, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 167, in dump
ForkingPickler(file, protocol).dump(obj)
File “C:\Program Files\Python27\lib\pickle.py”, line 224, in dump
self.save(obj)
File “C:\Program Files\Python27\lib\pickle.py”, line 331, in save
self.save_reduce(obj=obj, *rv)
File “C:\Program Files\Python27\lib\pickle.py”, line 419, in save_reduce
save(state)
File “C:\Program Files\Python27\lib\pickle.py”, line 286, in save
f(self, obj) # Call unbound method with explicit self
File “C:\Program Files\Python27\lib\pickle.py”, line 649, in save_dict
self._batch_setitems(obj.iteritems())
File “C:\Program Files\Python27\lib\pickle.py”, line 681, in _batch_setitems
save(v)
File “C:\Program Files\Python27\lib\pickle.py”, line 286, in save
f(self, obj) # Call unbound method with explicit self
File “C:\Program Files\Python27\lib\pickle.py”, line 753, in save_global
(obj, module, name))
pickle.PicklingError: Can’t pickle : it’s not the same object as __main__.result_manager
Traceback (most recent call last):
File “”, line 1, in
File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 347, in main
self = load(from_parent)
File “C:\Program Files\Python27\lib\pickle.py”, line 1378, in load
return Unpickler(file).load()
File “C:\Program Files\Python27\lib\pickle.py”, line 858, in load
dispatch[key](self)
File “C:\Program Files\Python27\lib\pickle.py”, line 880, in load_eof
raise EOFError
EOFError
It seems that the main function is getting confused by the multiple use of “result_manager” and “ventilator” as both a function and an object.
For example:
# Fire up our result manager…
result_manager = Process(target=result_manager, args=())
result_manager.start()
# Start the ventilator!
ventilator = Process(target=ventilator, args=())
ventilator.start()
The resolution is to give them different names. I’ve changed the functions to have _func after them. This resolves the issues.
Thanks for the heads up, I’ll fix the example once I get a few spare minutes!
Pingback: Python Multiprocessing with ZeroMQ « Twisted Pair Development
I’ve been playing with similar ideas – one issue that I have is that the ventilator and result manager have to have a shared knowledge of how much work has to be done (10000 messages in this case), which is dissatisfying – ideally, I’d like the ventilator to signal when it’s through sending data, but I don’t see how to coordinate that properly …
Cheers,
Tim
I haven’t had time to whip up an example – but you can solve this by setting up a direct communication channel between the ventilator and the results manager. The ventilator can then tell the results manager how much work it should be expecting total, etc
Hi,
Your blog motivated me to learn about 0MQ. Today I modified your code to do the following – let the ventilator send instead of numbers to workers, an array of good size instead. The issue that I ran into is that after the string_length variable near the top of the module is larger than 1024 * 1024 *3, the code would pause after the result manager gets back an answer. Have you seen anything like this? If so, how did you resolve it? I am running Ubuntu 11.04 64 bit on a machine with 2 cores, 8GB RAM. Python 2.7.1+, libzmq1 2.1.10 and pyzmg 2.1.10.
Thanks!
–Zack
import time
import zmq
from multiprocessing import Process, cpu_count
from os import urandom
np = cpu_count()
pool_size = np
number_of_elements = 128
# Odd, why once the slen is bumped to 3MB or above, the code hangs?
string_length = 1024 * 1024 * 3
def create_inputs(nelem, slen, pb=True):
”’
Generates an array that contains nelem fix-sized (of slen bytes)
random strings and an accompanying array of hexdigests of the
former’s elements. Both are returned in a tuple.
:type nelem: int
:param nelem: The desired number of elements in the to be generated
array.
:type slen: int
:param slen: The desired number of bytes of each array element.
:type pb: bool
:param pb: If True, displays a text progress bar during input array
generation.
”’
from os import urandom
import sys
import hashlib
if pb:
if nelem <= 64:
toolbar_width = nelem
chunk_size = 1
else:
toolbar_width = 64
chunk_size = nelem // toolbar_width
array_description = '%d random strings of size %d bytes. ' % (nelem, slen)
s = ''.join(('Generating an array of ', array_description, 'Working …\n'))
sys.stdout.write(s)
# create an ASCII progress bar
sys.stdout.write("[%s]" % (" " * toolbar_width))
sys.stdout.flush()
sys.stdout.write("\b" * (toolbar_width+1))
array = list()
hash4a = list()
try:
for i in range(nelem):
e = urandom(int(slen))
array.append(e)
h = hashlib.md5()
h.update(e)
he = h.hexdigest()
hash4a.append(he)
i += 1
if pb and i and i % chunk_size == 0:
sys.stdout.write("-")
sys.stdout.flush()
if pb:
sys.stdout.write("\n")
except MemoryError:
print('Memory Error: discarding existing arrays')
array = list()
hash4a = list()
finally:
return array, hash4a
# The "ventilator" function generates an array of nelem fix-sized (of slen
# bytes long) random strings, and sends the array down a zeromq "PUSH"
# connection to be processed by listening workers, in a round robin load
# balanced fashion.
def ventilator():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to send work
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
# Give everything a second to spin up and connect
time.sleep(1)
# Create the input array
nelem = number_of_elements
slen = string_length
payloads = create_inputs(nelem, slen)
# Send an array to each worker
for num in range(np):
work_message = { 'num' : payloads }
ventilator_send.send_pyobj(work_message)
time.sleep(1)
# The "worker" functions listen on a zeromq PULL connection for "work"
# (array to be processed) from the ventilator, get the length of the array
# and send the results down another zeromq PUSH connection to the results
# manager.
def worker(wrk_num):
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive work from the ventilator
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
# Set up a channel to send result of work to the results reporter
results_sender = context.socket(zmq.PUSH)
results_sender.connect("tcp://127.0.0.1:5558")
# Set up a channel to receive control messages over
control_receiver = context.socket(zmq.SUB)
control_receiver.connect("tcp://127.0.0.1:5559")
control_receiver.setsockopt(zmq.SUBSCRIBE, "")
# Set up a poller to multiplex the work receiver and control receiver channels
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
poller.register(control_receiver, zmq.POLLIN)
# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll())
# If the message came from work_receiver channel, get the length
# of the array and send the answer to the results reporter
if socks.get(work_receiver) == zmq.POLLIN:
#work_message = work_receiver.recv_json()
work_message = work_receiver.recv_pyobj()
length = len(work_message['num'][0])
answer_message = { 'worker' : wrk_num, 'result' : length }
results_sender.send_json(answer_message)
# If the message came over the control channel, shut down the worker.
if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv()
if control_message == "FINISHED":
print("Worker %i received FINSHED, quitting!" % wrk_num)
break
# The "results_manager" function receives each result from multiple workers,
# and prints those results. When all results have been received, it signals
# the worker processes to shut down.
def result_manager():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive results
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5558")
# Set up a channel to send control commands
control_sender = context.socket(zmq.PUB)
control_sender.bind("tcp://127.0.0.1:5559")
for task_nbr in range(np):
result_message = results_receiver.recv_json()
print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])
# Signal to all workers that we are finsihed
control_sender.send("FINISHED")
time.sleep(5)
if __name__ == "__main__":
# Create a pool of workers to distribute work to
worker_pool = range(pool_size)
for wrk_num in range(len(worker_pool)):
Process(target=worker, args=(wrk_num,)).start()
# Fire up our result manager…
result_manager = Process(target=result_manager, args=())
result_manager.start()
# Start the ventilator!
ventilator = Process(target=ventilator, args=())
ventilator.start()
Just a quick follow-up. I posted a more detailed Q on stackoverflow, and got an insightful answer. Please see: http://stackoverflow.com/questions/8905147/why-does-this-python-0mq-script-for-distributed-computing-hang-at-a-fixed-input. Thanks!
— Zack
“Your blog motivated me to learn about 0MQ” is a great sentence to see the day I decide to reinvigorate the blog. Thank you for the feedback!
So you’re basically using multiprocess Process in place of fork here, is that right? (I.e. it doesn’t add much to the equation?) Just wondering.
Yes, I was just using it as a fork.
Pingback: Scott Banwart's Blog › Distributed Weekly 206
For those attempting this cool example, I found issue with python3 and zmq 2.1.11 on ubuntu 12.04
Change the following lines, to avoid a TypeError:
control_receiver.setsockopt(zmq.SUBSCRIBE, “”)
control_sender.send(“FINISHED”)
control_receiver.setsockopt_unicode(zmq.SUBSCRIBE, ”)
control_sender.send_unicode(“FINISHED”)