PostgreSQL 9, Listen / Notify, and Jruby.

The LISTEN / NOTIFY functionality of PostgreSQL received a large boost to functionality in the 9.0 release. Since I’ve lately been experimenting with JRuby, I decided to write up some example code of utilizing LISTEN / NOTIFY via JDBC.

LISTEN / NOTIFY basically provides a publish / subscribe message bus within PostgreSQL. Clients can register as a subscriber on a channel, and will receive all notifications sent to that channel.

9.0 added a “payload” option to the notifier – a notification can contain an optional text string.

I worked up a quick test of using this functionality in JRuby.

The example sets up the following table in a local PostgreSQL 9.0 database:

CREATE TABLE watched_table (
  id              SERIAL PRIMARY KEY,
  value           INT,
  date_updated    TIMESTAMP NOT NULL DEFAULT now()
);

It then sets up a trigger, that upon insert, sends a notify event:

CREATE FUNCTION notify_trigger() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('watchers', TG_TABLE_NAME || ',id,' || NEW.id );
  RETURN new;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER watched_table_trigger AFTER INSERT ON watched_table
FOR EACH ROW EXECUTE PROCEDURE notify_trigger();

In the example, the first thread inserts into this table every three seconds:

def insert_thread(url)
  insert_conn = DriverManager.get_connection(url)
  cnt = 0 
  while cnt < 10
    stmt = insert_conn.create_statement
    stmt.execute("INSERT INTO watched_table (value) VALUES (1)")
    cnt = cnt + 1 
    sleep 3
  end 
end

The listener then connects, and polls PostgreSQL every second to see if there are new notifications. If there are, the id of the inserted record is sent in the notification, and the listener thread retrieves the record:

def listen_thread(url)
  listen_conn = DriverManager.get_connection(url)
  
  stmt = listen_conn.create_statement
  stmt.execute("LISTEN watchers")
  stmt.close

  while true
    sleep 1
    puts 'polling...'

    notifications = listen_conn.get_notifications || []

    notifications.each do |notification|
      unless notification.nil?
        puts "NOTIFICATION ------------"
        puts "pid: #{notification.pid}" 
        puts "name: #{notification.name}"
        puts "param: #{notification.parameter}"
        puts "-------------------------"

        id = notification.parameter.split(',').last
        puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}"
        stmt = listen_conn.create_statement
        rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}")

        while rs.next
          puts "id:\t#{rs.get_int(1)}"
          puts "value:\t#{rs.get_int(2)}"
          puts "record_time:\t#{rs.get_timestamp(3)}"
        end

        puts "-------------------------"

        stmt.close
        rs.close
      end
    end
  end
end

Here’s the code of the entire example:

require 'rubygems'
require 'bundler/setup'
require 'java'

$LOAD_PATH << 'vendor/jars/'
require 'postgresql-9.0-801.jdbc3.jar'

# set up our database connection to the example database...
java_import java.sql.DriverManager
DriverManager.register_driver(org.postgresql.Driver.new)
url = "jdbc:postgresql://localhost/listen_notify_poller"

def insert_thread(url)
  insert_conn = DriverManager.get_connection(url)
  cnt = 0
  while cnt < 10
    stmt = insert_conn.create_statement
    stmt.execute("INSERT INTO watched_table (value) VALUES (1)")
    cnt = cnt + 1
    sleep 3
  end
end

def listen_thread(url)
  listen_conn = DriverManager.get_connection(url)
  
  # register our client as a listner on the "watchers" channel....
  stmt = listen_conn.create_statement
  stmt.execute("LISTEN watchers")
  stmt.close

  # check for new notifications once a second...
  while true
    sleep 1
    puts 'polling...'

    # check for notifications
    notifications = listen_conn.get_notifications || []

    # if there are notifications, display a little info on them
    notifications.each do |notification|
      unless notification.nil?
        puts "NOTIFICATION ------------"
        puts "pid: #{notification.pid}" 
        puts "name: #{notification.name}" 
        puts "param: #{notification.parameter}" 
        puts "-------------------------"

        # retrieve the inserted record thisnotification was for.
        id = notification.parameter.split(',').last
        puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}"
        stmt = listen_conn.create_statement
        rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}")

        while rs.next
          puts "id:\t#{rs.get_int(1)}"
          puts "value:\t#{rs.get_int(2)}"
          puts "record_time:\t#{rs.get_timestamp(3)}"
        end
      
        puts "-------------------------"
      
        stmt.close
        rs.close
      end
    end
  end
end

insert_thread = Thread.new{insert_thread(url)}
listen_thread = Thread.new{listen_thread(url)}

listen_thread.join
insert_thread.join

About taotetek

Sometimes stereotypical but never ironic. You can't stop the signal, Mal. All my opinions are my own, unless I stole them from you.
This entry was posted in jruby, postgresql and tagged . Bookmark the permalink.

4 Responses to PostgreSQL 9, Listen / Notify, and Jruby.

  1. is there any similar listen/notify code in python? i am desperately looking for similar functionality for mysql.

  2. Diana says:

    I don’t know who you are, but you helped me very much, by searching the web I couldn’t find any simple example except this one! Used for my java app. So just thanks xD

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s