Python Multiprocessing with ZeroMQ

Recently, I’ve begun exploring the depths of ZeroMQ, and the pyzmq bindings.  This post is not an introduction to ZeroMQ, but for a basic rundown the “0MQ in A Hundred Words” blurb from the project site suffices:

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pubsub, task distribution, and request-reply. It’s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.

For more detail I highly recommend reading 0MQ, The Guide.

As I was going over the various python code examples at github, I became interested in the taskvent / tasksink / taskwork examples. The pattern was recognizable as one I often use for distributed processing. In the past, I typically would have implemented such a work flow using the python multiprocessing library, using it’s Queue class to communicate between processes. Recently I’ve implemented several data processing pipelines for work using the same technique, but using zeromq channels for communication, and I’ve been extremely pleased with both the performance, and the ease of use. So, I decided to write a short blog post with a simple example for others working on the same sorts of problems.

For the example, I’ve implemented a small distributed system that calculates the squares of a series of numbers, based on the python examples. The pieces are as follows:

ventilator

  • The ventilator sends messages containing the numbers to be squared.
  • Uses a ZMQ_PUSH socket to send messages to workers.

workers

  • The workers receive messages from the ventilator, do the work, and send the results down the pipe.
  • Uses a ZMQ_PUSH socket to send answers to the results manager.
  • Uses a ZMQ_SUB socket to receive the FINISH message from the results manager.

results manager

  • The results manager receives all answers from all workers, prints them, and sends a message to the workers to shut down when all tasks are complete.
  • Uses a ZMQ_PULL socket to receive answers from the workers.
  • Uses a ZMQ_PUB socket to send the FINISH message to the workers.

A diagram of the basic flow:

Process Diagram.

So, without further ado, let’s look at the code!

First, we import zmq, time, and the multiprocessing Process class:

import time
import zmq
from  multiprocessing import Process

ventilator

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Send the numbers between 1 and ten thousand as work messages
    for num in range(10000):
        work_message = { 'num' : num }
        ventilator_send.send_json(work_message)

    time.sleep(1)

worker

def worker(wrk_num):
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.connect("tcp://127.0.0.1:5558")

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    # Loop and accept messages from both channels, acting accordingly
    while True:
        socks = dict(poller.poll())

    # If the message came from work_receiver channel, square the number
    # and send the answer to the results reporter
    if socks.get(work_receiver) == zmq.POLLIN:
        work_message = work_receiver.recv_json()
        product = work_message['num'] * work_message['num']
        answer_message = { 'worker' : wrk_num, 'result' : product }
        results_sender.send_json(answer_message)

    # If the message came over the control channel, shut down the worker.
    if socks.get(control_receiver) == zmq.POLLIN:
        control_message = control_receiver.recv()
        if control_message == "FINISHED":
            print("Worker %i received FINSHED, quitting!" % wrk_num)
            break

results manager

def result_manager():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive results
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")

    # Set up a channel to send control commands
    control_sender = context.socket(zmq.PUB)
    control_sender.bind("tcp://127.0.0.1:5559")

    for task_nbr in range(10000):
        result_message = results_receiver.recv_json()
        print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])

        # Signal to all workers that we are finsihed
        control_sender.send("FINISHED")
        time.sleep(5)

And away we go…

if __name__ == "__main__":

    # Create a pool of workers to distribute work to
    worker_pool = range(10)
    for wrk_num in range(len(worker_pool)):
    Process(target=worker, args=(wrk_num,)).start()

    # Fire up our result manager...
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()

The code from this example is available at: https://github.com/taotetek/blog_examples

You may have noticed that I am using the tcp transport for communication between my processes.  This is what I personally find the most exciting about ZeroMQ: if you use ZeroMQ for communication between processes in a multiprocess system, it is close to trivial to scale the code to run on multiple servers.

This only begins to scratch the surface of ZeroMQ.  ZeroMQ supports multiple transports and many more topologies, and the library is available for multiple languages.  This is a fantastic tool for both distributed computing, and polygot programming.

About taotetek

Sometimes stereotypical but never ironic. You can't stop the signal, Mal. All my opinions are my own, unless I stole them from you.
This entry was posted in Python and tagged . Bookmark the permalink.

15 Responses to Python Multiprocessing with ZeroMQ

  1. Pingback: links for 2011-02-04 « Bloggitation

  2. FYI, the worker thread has lost its indentation and the break call results in an error.
    The indentation in the Github file is correct, https://github.com/taotetek/blog_examples/blob/master/python_multiprocessing_with_zeromq/workqueue_example.py

  3. Hi Brian,

    Running the code with Python 2.7 (32 and 64-bit), gevent 0.13.3, greenlet 0.3.1 and pyzmq 2.0.10.1 (although thats not important) results in the following error:

    Traceback (most recent call last):
    File “C:\Users\adam\workspace\python\ComPy\src\main.py”, line 125, in
    result_manager.start()
    File “C:\Program Files\Python27\lib\multiprocessing\process.py”, line 104, in start
    self._popen = Popen(self)
    File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 244, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
    File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 167, in dump
    ForkingPickler(file, protocol).dump(obj)
    File “C:\Program Files\Python27\lib\pickle.py”, line 224, in dump
    self.save(obj)
    File “C:\Program Files\Python27\lib\pickle.py”, line 331, in save
    self.save_reduce(obj=obj, *rv)
    File “C:\Program Files\Python27\lib\pickle.py”, line 419, in save_reduce
    save(state)
    File “C:\Program Files\Python27\lib\pickle.py”, line 286, in save
    f(self, obj) # Call unbound method with explicit self
    File “C:\Program Files\Python27\lib\pickle.py”, line 649, in save_dict
    self._batch_setitems(obj.iteritems())
    File “C:\Program Files\Python27\lib\pickle.py”, line 681, in _batch_setitems
    save(v)
    File “C:\Program Files\Python27\lib\pickle.py”, line 286, in save
    f(self, obj) # Call unbound method with explicit self
    File “C:\Program Files\Python27\lib\pickle.py”, line 753, in save_global
    (obj, module, name))
    pickle.PicklingError: Can’t pickle : it’s not the same object as __main__.result_manager
    Traceback (most recent call last):
    File “”, line 1, in
    File “C:\Program Files\Python27\lib\multiprocessing\forking.py”, line 347, in main
    self = load(from_parent)
    File “C:\Program Files\Python27\lib\pickle.py”, line 1378, in load
    return Unpickler(file).load()
    File “C:\Program Files\Python27\lib\pickle.py”, line 858, in load
    dispatch[key](self)
    File “C:\Program Files\Python27\lib\pickle.py”, line 880, in load_eof
    raise EOFError
    EOFError

    It seems that the main function is getting confused by the multiple use of “result_manager” and “ventilator” as both a function and an object.
    For example:
    # Fire up our result manager…
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()

    The resolution is to give them different names. I’ve changed the functions to have _func after them. This resolves the issues.

  4. Pingback: Python Multiprocessing with ZeroMQ « Twisted Pair Development

  5. Timothy M. Shead says:

    I’ve been playing with similar ideas – one issue that I have is that the ventilator and result manager have to have a shared knowledge of how much work has to be done (10000 messages in this case), which is dissatisfying – ideally, I’d like the ventilator to signal when it’s through sending data, but I don’t see how to coordinate that properly …

    Cheers,
    Tim

    • taotetek says:

      I haven’t had time to whip up an example – but you can solve this by setting up a direct communication channel between the ventilator and the results manager. The ventilator can then tell the results manager how much work it should be expecting total, etc

  6. Zack Perry says:

    Hi,

    Your blog motivated me to learn about 0MQ. Today I modified your code to do the following – let the ventilator send instead of numbers to workers, an array of good size instead. The issue that I ran into is that after the string_length variable near the top of the module is larger than 1024 * 1024 *3, the code would pause after the result manager gets back an answer. Have you seen anything like this? If so, how did you resolve it? I am running Ubuntu 11.04 64 bit on a machine with 2 cores, 8GB RAM. Python 2.7.1+, libzmq1 2.1.10 and pyzmg 2.1.10.

    Thanks!

    –Zack
    import time
    import zmq
    from multiprocessing import Process, cpu_count
    from os import urandom

    np = cpu_count()
    pool_size = np
    number_of_elements = 128
    # Odd, why once the slen is bumped to 3MB or above, the code hangs?
    string_length = 1024 * 1024 * 3

    def create_inputs(nelem, slen, pb=True):
    ”’
    Generates an array that contains nelem fix-sized (of slen bytes)
    random strings and an accompanying array of hexdigests of the
    former’s elements. Both are returned in a tuple.

    :type nelem: int
    :param nelem: The desired number of elements in the to be generated
    array.
    :type slen: int
    :param slen: The desired number of bytes of each array element.
    :type pb: bool
    :param pb: If True, displays a text progress bar during input array
    generation.
    ”’
    from os import urandom
    import sys
    import hashlib

    if pb:
    if nelem <= 64:
    toolbar_width = nelem
    chunk_size = 1
    else:
    toolbar_width = 64
    chunk_size = nelem // toolbar_width
    array_description = '%d random strings of size %d bytes. ' % (nelem, slen)
    s = ''.join(('Generating an array of ', array_description, 'Working …\n'))
    sys.stdout.write(s)
    # create an ASCII progress bar
    sys.stdout.write("[%s]" % (" " * toolbar_width))
    sys.stdout.flush()
    sys.stdout.write("\b" * (toolbar_width+1))
    array = list()
    hash4a = list()
    try:
    for i in range(nelem):
    e = urandom(int(slen))
    array.append(e)
    h = hashlib.md5()
    h.update(e)
    he = h.hexdigest()
    hash4a.append(he)
    i += 1
    if pb and i and i % chunk_size == 0:
    sys.stdout.write("-")
    sys.stdout.flush()
    if pb:
    sys.stdout.write("\n")
    except MemoryError:
    print('Memory Error: discarding existing arrays')
    array = list()
    hash4a = list()
    finally:
    return array, hash4a

    # The "ventilator" function generates an array of nelem fix-sized (of slen
    # bytes long) random strings, and sends the array down a zeromq "PUSH"
    # connection to be processed by listening workers, in a round robin load
    # balanced fashion.

    def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Create the input array
    nelem = number_of_elements
    slen = string_length
    payloads = create_inputs(nelem, slen)

    # Send an array to each worker
    for num in range(np):
    work_message = { 'num' : payloads }
    ventilator_send.send_pyobj(work_message)

    time.sleep(1)

    # The "worker" functions listen on a zeromq PULL connection for "work"
    # (array to be processed) from the ventilator, get the length of the array
    # and send the results down another zeromq PUSH connection to the results
    # manager.

    def worker(wrk_num):
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.connect("tcp://127.0.0.1:5558")

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    # Loop and accept messages from both channels, acting accordingly
    while True:
    socks = dict(poller.poll())

    # If the message came from work_receiver channel, get the length
    # of the array and send the answer to the results reporter
    if socks.get(work_receiver) == zmq.POLLIN:
    #work_message = work_receiver.recv_json()
    work_message = work_receiver.recv_pyobj()
    length = len(work_message['num'][0])
    answer_message = { 'worker' : wrk_num, 'result' : length }
    results_sender.send_json(answer_message)

    # If the message came over the control channel, shut down the worker.
    if socks.get(control_receiver) == zmq.POLLIN:
    control_message = control_receiver.recv()
    if control_message == "FINISHED":
    print("Worker %i received FINSHED, quitting!" % wrk_num)
    break

    # The "results_manager" function receives each result from multiple workers,
    # and prints those results. When all results have been received, it signals
    # the worker processes to shut down.

    def result_manager():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive results
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")

    # Set up a channel to send control commands
    control_sender = context.socket(zmq.PUB)
    control_sender.bind("tcp://127.0.0.1:5559")

    for task_nbr in range(np):
    result_message = results_receiver.recv_json()
    print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])

    # Signal to all workers that we are finsihed
    control_sender.send("FINISHED")
    time.sleep(5)

    if __name__ == "__main__":

    # Create a pool of workers to distribute work to
    worker_pool = range(pool_size)
    for wrk_num in range(len(worker_pool)):
    Process(target=worker, args=(wrk_num,)).start()

    # Fire up our result manager…
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()

  7. rogerdpack says:

    So you’re basically using multiprocess Process in place of fork here, is that right? (I.e. it doesn’t add much to the equation?) Just wondering.

  8. Pingback: Scott Banwart's Blog › Distributed Weekly 206

  9. dallas says:

    For those attempting this cool example, I found issue with python3 and zmq 2.1.11 on ubuntu 12.04

    Change the following lines, to avoid a TypeError:

    control_receiver.setsockopt(zmq.SUBSCRIBE, “”)
    control_sender.send(“FINISHED”)

    control_receiver.setsockopt_unicode(zmq.SUBSCRIBE, ”)
    control_sender.send_unicode(“FINISHED”)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s