The LISTEN / NOTIFY functionality of PostgreSQL received a large boost to functionality in the 9.0 release. Since I’ve lately been experimenting with JRuby, I decided to write up some example code of utilizing LISTEN / NOTIFY via JDBC.
LISTEN / NOTIFY basically provides a publish / subscribe message bus within PostgreSQL. Clients can register as a subscriber on a channel, and will receive all notifications sent to that channel.
9.0 added a “payload” option to the notifier – a notification can contain an optional text string.
I worked up a quick test of using this functionality in JRuby.
The example sets up the following table in a local PostgreSQL 9.0 database:
CREATE TABLE watched_table ( id SERIAL PRIMARY KEY, value INT, date_updated TIMESTAMP NOT NULL DEFAULT now() );
It then sets up a trigger, that upon insert, sends a notify event:
CREATE FUNCTION notify_trigger() RETURNS trigger AS $$ DECLARE BEGIN PERFORM pg_notify('watchers', TG_TABLE_NAME || ',id,' || NEW.id ); RETURN new; END; $$ LANGUAGE plpgsql; CREATE TRIGGER watched_table_trigger AFTER INSERT ON watched_table FOR EACH ROW EXECUTE PROCEDURE notify_trigger();
In the example, the first thread inserts into this table every three seconds:
def insert_thread(url) insert_conn = DriverManager.get_connection(url) cnt = 0 while cnt < 10 stmt = insert_conn.create_statement stmt.execute("INSERT INTO watched_table (value) VALUES (1)") cnt = cnt + 1 sleep 3 end end
The listener then connects, and polls PostgreSQL every second to see if there are new notifications. If there are, the id of the inserted record is sent in the notification, and the listener thread retrieves the record:
def listen_thread(url) listen_conn = DriverManager.get_connection(url) stmt = listen_conn.create_statement stmt.execute("LISTEN watchers") stmt.close while true sleep 1 puts 'polling...' notifications = listen_conn.get_notifications || [] notifications.each do |notification| unless notification.nil? puts "NOTIFICATION ------------" puts "pid: #{notification.pid}" puts "name: #{notification.name}" puts "param: #{notification.parameter}" puts "-------------------------" id = notification.parameter.split(',').last puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}" stmt = listen_conn.create_statement rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}") while rs.next puts "id:\t#{rs.get_int(1)}" puts "value:\t#{rs.get_int(2)}" puts "record_time:\t#{rs.get_timestamp(3)}" end puts "-------------------------" stmt.close rs.close end end end end
Here’s the code of the entire example:
require 'rubygems' require 'bundler/setup' require 'java' $LOAD_PATH << 'vendor/jars/' require 'postgresql-9.0-801.jdbc3.jar' # set up our database connection to the example database... java_import java.sql.DriverManager DriverManager.register_driver(org.postgresql.Driver.new) url = "jdbc:postgresql://localhost/listen_notify_poller" def insert_thread(url) insert_conn = DriverManager.get_connection(url) cnt = 0 while cnt < 10 stmt = insert_conn.create_statement stmt.execute("INSERT INTO watched_table (value) VALUES (1)") cnt = cnt + 1 sleep 3 end end def listen_thread(url) listen_conn = DriverManager.get_connection(url) # register our client as a listner on the "watchers" channel.... stmt = listen_conn.create_statement stmt.execute("LISTEN watchers") stmt.close # check for new notifications once a second... while true sleep 1 puts 'polling...' # check for notifications notifications = listen_conn.get_notifications || [] # if there are notifications, display a little info on them notifications.each do |notification| unless notification.nil? puts "NOTIFICATION ------------" puts "pid: #{notification.pid}" puts "name: #{notification.name}" puts "param: #{notification.parameter}" puts "-------------------------" # retrieve the inserted record thisnotification was for. id = notification.parameter.split(',').last puts "RETRIEVING RECORD FOR NOTIFIED ID: #{id}" stmt = listen_conn.create_statement rs = stmt.execute_query("SELECT * FROM watched_table WHERE id = #{id}") while rs.next puts "id:\t#{rs.get_int(1)}" puts "value:\t#{rs.get_int(2)}" puts "record_time:\t#{rs.get_timestamp(3)}" end puts "-------------------------" stmt.close rs.close end end end end insert_thread = Thread.new{insert_thread(url)} listen_thread = Thread.new{listen_thread(url)} listen_thread.join insert_thread.join
is there any similar listen/notify code in python? i am desperately looking for similar functionality for mysql.
Harshad – the LISTEN / NOTIFY message bus is a feature of PostgreSQL. See http://www.postgresql.org/docs/9.1/static/sql-notify.html and http://www.postgresql.org/docs/9.1/interactive/sql-listen.html.
i hope i am able to translate this ruby code into python. please let me know if you know any similar script or app doing same..
I don’t know who you are, but you helped me very much, by searching the web I couldn’t find any simple example except this one! Used for my java app. So just thanks xD