ZeroMQ Input / Output Plugins for Rsyslog

Just dropping a quick note that Aggregate Knowledge, where I work as Service Delivery Data Architect, has released our first open source release this week – zeromq input and output plugins for rsyslog. Give them a spin if it’s something you’re interested in! You can find them at the Aggregate Knowledge ZeroMQ Rsyslog Plugin repository on github.

Posted in 0mq, zeromq | Tagged | Leave a comment

How Much Do My Date Partitions Grow Each Day? – Stupid PostgreSQL Tricks

Today I had to do some fast analysis on a data warehouse to see how much the database was growing each day. The PostgreSQL database was date partitioned in a fairly standard way: each partitioned table had tables with the naming convention “tablename_YYYY_MM_DD”, and these tables were located in a schema named “partitions”. There were several hundred tables that were partitioned this way. Here’s a quick one liner from bash that gets the answer:

echo "SELECT sum(pg_total_relation_size(schemaname || '.' || tablename)) FROM pg_tables WHERE schemaname = 'partitions' AND tablename LIKE '%_2011_04_12'" | psql database_name

Posted in postgresql | Tagged | Leave a comment

PostgreSQL 9, Listen / Notify, and Jruby – Part 2

I worked up one more example, in this case, using PL/Ruby rather than PL/PgSQL… and then using yaml to serialize the NEW record returned by the trigger, send it to the client, and reconstitute it as a hash.

The PL/Ruby stored proc:

CREATE FUNCTION beam_me_up() RETURNS TRIGGER AS $$
require 'yaml'
payload = new.to_yaml
$Plans["engage"] = PL::Plan.new("NOTIFY watchers, '#{payload}'")
$Plans["engage"].exec()
$$ LANGUAGE 'plruby';

CREATE TRIGGER beam_me_up_trigger AFTER INSERT ON watched_table
FOR EACH ROW EXECUTE PROCEDURE beam_me_up();

And, the test client…

require 'rubygems'
require 'bundler/setup'
require 'java'
require 'yaml'

$LOAD_PATH << 'vendor/jars/'
require 'postgresql-9.0-801.jdbc3.jar'

# set up our database connection to the example database...
java_import java.sql.DriverManager
DriverManager.register_driver(org.postgresql.Driver.new)
url = "jdbc:postgresql://localhost/listen_notify_poller"

def insert_thread(url)
   8 lines:  insert_conn = DriverManager.get_connection(url) ------------------------------------------------------
end

def listen_thread(url)
  listen_conn = DriverManager.get_connection(url)

  stmt = listen_conn.create_statement
  stmt.execute("LISTEN watchers")
  stmt.close

  while true
    sleep 1
    puts 'polling...'

    notifications = listen_conn.get_notifications || []

    notifications.each do |notification|
      unless notification.nil?
        test = YAML::load(notification.parameter)
        puts test.inspect
      end
    end
  end
end

insert_thread = Thread.new{insert_thread(url)}
listen_thread = Thread.new{listen_thread(url)}

listen_thread.join
insert_thread.join
Posted in jruby, postgresql | Tagged | Leave a comment

PostgreSQL 9, Listen / Notify, and Jruby.

The LISTEN / NOTIFY functionality of PostgreSQL received a large boost to functionality in the 9.0 release. Since I’ve lately been experimenting with JRuby, I decided to write up some example code of utilizing LISTEN / NOTIFY via JDBC.

LISTEN / NOTIFY basically provides a publish / subscribe message bus within PostgreSQL. Clients can register as a subscriber on a channel, and will receive all notifications sent to that channel.

9.0 added a “payload” option to the notifier – a notification can contain an optional text string.

I worked up a quick test of using this functionality in JRuby.

The example sets up the following table in a local PostgreSQL 9.0 database:

CREATE TABLE watched_table (
  id              SERIAL PRIMARY KEY,
  value           INT,
  date_updated    TIMESTAMP NOT NULL DEFAULT now()
);

It then sets up a trigger, that upon insert, sends a notify event:

CREATE FUNCTION notify_trigger() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('watchers', TG_TABLE_NAME || ',id,' || NEW.id );
  RETURN new;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER watched_table_trigger AFTER INSERT ON watched_table
FOR EACH ROW EXECUTE PROCEDURE notify_trigger();

In the example, the first thread inserts into this table every three seconds:

def insert_thread(url)
  insert_conn = DriverManager.get_connection(url)
  cnt = 0 
  while cnt < 10
    stmt = insert_conn.create_statement
    stmt.execute("INSERT INTO watched_table (value) VALUES (1)")
    cnt = cnt + 1 
    sleep 3
  end 
end

The listener then connects, and polls PostgreSQL every second to see if there are new notifications. If there are, the id of the inserted record is sent in the notification, and the listener thread retrieves the record:

def listen_thread(url)
  listen_conn = DriverManager.get_connection(url)
  
  stmt = listen_conn.create_statement
  stmt.execute("LISTEN watchers")
  stmt.close

  while true
    sleep 1
    puts 'polling...'

    notifications = listen_conn.get_notifications || []

    notifications.each do |notification|
      unless notification.nil?
        puts "NOTIFICATION ------------"
        puts "pid: #{notification.pid}" 
        puts "name: #{notification.name}"
        puts "param: #{notification.parameter}"
        puts "-------------------------"

        id = notification.parameter.split(',').last
        puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}"
        stmt = listen_conn.create_statement
        rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}")

        while rs.next
          puts "id:\t#{rs.get_int(1)}"
          puts "value:\t#{rs.get_int(2)}"
          puts "record_time:\t#{rs.get_timestamp(3)}"
        end

        puts "-------------------------"

        stmt.close
        rs.close
      end
    end
  end
end

Here’s the code of the entire example:

require 'rubygems'
require 'bundler/setup'
require 'java'

$LOAD_PATH << 'vendor/jars/'
require 'postgresql-9.0-801.jdbc3.jar'

# set up our database connection to the example database...
java_import java.sql.DriverManager
DriverManager.register_driver(org.postgresql.Driver.new)
url = "jdbc:postgresql://localhost/listen_notify_poller"

def insert_thread(url)
  insert_conn = DriverManager.get_connection(url)
  cnt = 0
  while cnt < 10
    stmt = insert_conn.create_statement
    stmt.execute("INSERT INTO watched_table (value) VALUES (1)")
    cnt = cnt + 1
    sleep 3
  end
end

def listen_thread(url)
  listen_conn = DriverManager.get_connection(url)
  
  # register our client as a listner on the "watchers" channel....
  stmt = listen_conn.create_statement
  stmt.execute("LISTEN watchers")
  stmt.close

  # check for new notifications once a second...
  while true
    sleep 1
    puts 'polling...'

    # check for notifications
    notifications = listen_conn.get_notifications || []

    # if there are notifications, display a little info on them
    notifications.each do |notification|
      unless notification.nil?
        puts "NOTIFICATION ------------"
        puts "pid: #{notification.pid}" 
        puts "name: #{notification.name}" 
        puts "param: #{notification.parameter}" 
        puts "-------------------------"

        # retrieve the inserted record thisnotification was for.
        id = notification.parameter.split(',').last
        puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}"
        stmt = listen_conn.create_statement
        rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}")

        while rs.next
          puts "id:\t#{rs.get_int(1)}"
          puts "value:\t#{rs.get_int(2)}"
          puts "record_time:\t#{rs.get_timestamp(3)}"
        end
      
        puts "-------------------------"
      
        stmt.close
        rs.close
      end
    end
  end
end

insert_thread = Thread.new{insert_thread(url)}
listen_thread = Thread.new{listen_thread(url)}

listen_thread.join
insert_thread.join
Posted in jruby, postgresql | Tagged | 4 Comments

Mecha Multi Mechanize

Awhile back, I had the pleasure of contributing a small amount of code to Corey Goldberg’s excellent multi-mechanize web performance and load testing framework. Since I have been experimenting with using zeromq in place of the python Queue object for communication between processes and threads, I decided that multi-mechanize would be an excellent code base to experiment on. So today, I have started mecha-multi-mechanize. Over the course of the next few weeks, I plan to play with incorporating zeromq (and hopefully eventlet) into the original code.

As a first step, today I ripped out the Queue object that was used to communicate between agent threads and the resultswriter: Here is the diff from the commit. I was pleased with how straightforward it was.

Posted in Python, zeromq | Tagged , , | Leave a comment

Simple TCP to ZeroMQ Message Forwarder with Eventlet

I spent part of the day playing with eventlet today. Eventlet recently added zeromq support. Here’s a very simple message forwarder that sends line terminated messages sent over TCP to a zeromq push socket, using the zeromq hub:

import eventlet
from eventlet.green import socket, zmq


def read_socket(writer, reader):
    message = reader.readline()
    while message:
        writer.send(message)
        message = reader.readline()

def listen_socket(address, port):
    eventlet.hubs.use_hub("zeromq")    
    server = eventlet.listen((address, port))
    while True:
        new_connection, address = server.accept()
        new_zmq_context = zmq.Context()
        new_zmq_push_socket = new_zmq_context.socket(zmq.PUSH)
        new_zmq_push_socket.connect("tcp://127.0.0.1:5558")
        eventlet.spawn_n(read_socket, new_zmq_push_socket, new_connection.makefile('r'))

if __name__ == '__main__':
    listen_socket('127.0.0.1', 7000)
Posted in Python, zeromq | Tagged | Leave a comment

Python Multiprocessing – ZeroMQ vs Queue

As a quick follow up to my previous post, here’s a look at the performance of passing messages between two python processes using the Queue class vs using 0mq push / pull connections.  As a quick test, we will pass 10 million messages between two processes, first using Queue, then using 0mq.

Multiprocessing test with Queue

import sys
import time
from  multiprocessing import Process, Queue

def worker(q):
    for task_nbr in range(10000000):
        message = q.get()
    sys.exit(1)
 
def main():
    send_q = Queue()
    Process(target=worker, args=(send_q,)).start()
    for num in range(10000000):
        send_q.put("MESSAGE")

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 10000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec
 

Multiprocessing test with 0mq

import sys
import zmq
from  multiprocessing import Process
import time

def worker():
    context = zmq.Context()
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    for task_nbr in range(10000000):
        message = work_receiver.recv()

    sys.exit(1)

def main():
    Process(target=worker, args=()).start()
    context = zmq.Context()
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")
    for num in range(10000000):
        ventilator_send.send("MESSAGE")

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 10000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

Queue Results

python2 ./multiproc_with_queue.py
Duration: 164.182257891
Messages Per Second: 60907.9210414

0mq Results

python2 ./multiproc_with_zeromq.py
Duration: 23.3490710258
Messages Per Second: 428282.563744

The numbers speak for themselves.

Posted in Python, zeromq | Tagged | 7 Comments

Python Multiprocessing with ZeroMQ

Recently, I’ve begun exploring the depths of ZeroMQ, and the pyzmq bindings.  This post is not an introduction to ZeroMQ, but for a basic rundown the “0MQ in A Hundred Words” blurb from the project site suffices:

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pubsub, task distribution, and request-reply. It’s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.

For more detail I highly recommend reading 0MQ, The Guide.

As I was going over the various python code examples at github, I became interested in the taskvent / tasksink / taskwork examples. The pattern was recognizable as one I often use for distributed processing. In the past, I typically would have implemented such a work flow using the python multiprocessing library, using it’s Queue class to communicate between processes. Recently I’ve implemented several data processing pipelines for work using the same technique, but using zeromq channels for communication, and I’ve been extremely pleased with both the performance, and the ease of use. So, I decided to write a short blog post with a simple example for others working on the same sorts of problems.

For the example, I’ve implemented a small distributed system that calculates the squares of a series of numbers, based on the python examples. The pieces are as follows:

ventilator

  • The ventilator sends messages containing the numbers to be squared.
  • Uses a ZMQ_PUSH socket to send messages to workers.

workers

  • The workers receive messages from the ventilator, do the work, and send the results down the pipe.
  • Uses a ZMQ_PUSH socket to send answers to the results manager.
  • Uses a ZMQ_SUB socket to receive the FINISH message from the results manager.

results manager

  • The results manager receives all answers from all workers, prints them, and sends a message to the workers to shut down when all tasks are complete.
  • Uses a ZMQ_PULL socket to receive answers from the workers.
  • Uses a ZMQ_PUB socket to send the FINISH message to the workers.

A diagram of the basic flow:

Process Diagram.

So, without further ado, let’s look at the code!

First, we import zmq, time, and the multiprocessing Process class:

import time
import zmq
from  multiprocessing import Process

ventilator

def ventilator():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to send work
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")

    # Give everything a second to spin up and connect
    time.sleep(1)

    # Send the numbers between 1 and ten thousand as work messages
    for num in range(10000):
        work_message = { 'num' : num }
        ventilator_send.send_json(work_message)

    time.sleep(1)

worker

def worker(wrk_num):
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive work from the ventilator
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    # Set up a channel to send result of work to the results reporter
    results_sender = context.socket(zmq.PUSH)
    results_sender.connect("tcp://127.0.0.1:5558")

    # Set up a channel to receive control messages over
    control_receiver = context.socket(zmq.SUB)
    control_receiver.connect("tcp://127.0.0.1:5559")
    control_receiver.setsockopt(zmq.SUBSCRIBE, "")

    # Set up a poller to multiplex the work receiver and control receiver channels
    poller = zmq.Poller()
    poller.register(work_receiver, zmq.POLLIN)
    poller.register(control_receiver, zmq.POLLIN)

    # Loop and accept messages from both channels, acting accordingly
    while True:
        socks = dict(poller.poll())

    # If the message came from work_receiver channel, square the number
    # and send the answer to the results reporter
    if socks.get(work_receiver) == zmq.POLLIN:
        work_message = work_receiver.recv_json()
        product = work_message['num'] * work_message['num']
        answer_message = { 'worker' : wrk_num, 'result' : product }
        results_sender.send_json(answer_message)

    # If the message came over the control channel, shut down the worker.
    if socks.get(control_receiver) == zmq.POLLIN:
        control_message = control_receiver.recv()
        if control_message == "FINISHED":
            print("Worker %i received FINSHED, quitting!" % wrk_num)
            break

results manager

def result_manager():
    # Initialize a zeromq context
    context = zmq.Context()

    # Set up a channel to receive results
    results_receiver = context.socket(zmq.PULL)
    results_receiver.bind("tcp://127.0.0.1:5558")

    # Set up a channel to send control commands
    control_sender = context.socket(zmq.PUB)
    control_sender.bind("tcp://127.0.0.1:5559")

    for task_nbr in range(10000):
        result_message = results_receiver.recv_json()
        print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])

        # Signal to all workers that we are finsihed
        control_sender.send("FINISHED")
        time.sleep(5)

And away we go…

if __name__ == "__main__":

    # Create a pool of workers to distribute work to
    worker_pool = range(10)
    for wrk_num in range(len(worker_pool)):
    Process(target=worker, args=(wrk_num,)).start()

    # Fire up our result manager...
    result_manager = Process(target=result_manager, args=())
    result_manager.start()

    # Start the ventilator!
    ventilator = Process(target=ventilator, args=())
    ventilator.start()

The code from this example is available at: https://github.com/taotetek/blog_examples

You may have noticed that I am using the tcp transport for communication between my processes.  This is what I personally find the most exciting about ZeroMQ: if you use ZeroMQ for communication between processes in a multiprocess system, it is close to trivial to scale the code to run on multiple servers.

This only begins to scratch the surface of ZeroMQ.  ZeroMQ supports multiple transports and many more topologies, and the library is available for multiple languages.  This is a fantastic tool for both distributed computing, and polygot programming.

Posted in Python | Tagged | 15 Comments