pgjdbc-ng for Postgresql LISTEN and NOTIFY

Using Postgresql LISTEN pgjdbc-ng  to Improve Overhead

When using Postgresql LISTEN and NOTIFY, pgjdbc-ng is the perfect solution for getting notified on table changes.   I had a need to get notified when a table has a row inserted into it. Once notified I can then go read additional data to complete my processing. The initial idea was to poll the database every 10 seconds to see if a new row was inserted. This polling would work but too much overhead. pgjdbc-ng is what I needed.

Just What I Needed

Looking around I found that Postgresql has LISTEN and NOTIFY statements that support the event handling I am looking for. One issue, the default Postgresql JDBC driver doesn’t block waiting for the event to happen. Instead you need to continue to poll the database for the events. Better on the overhead since no query is actually executed but not optimal. I found that pgjdbc_ng is a re-written JDBC driver that does block and wait for an event to happen. Perfect solution! Also, as a side note the Postgresql C libraries work a bit different. Here is an example of the C implementation.

Whats needed to make this work using pgjdbc-ng

You will need a Table to monitor, a PSQL Function to send the notify event with a payload, a PSQL Trigger to call the above function on a row insert and Java code to listen for events.  

1 – Table to monitor

CREATE TABLE dm_queue ( id integer, domainid integer, command character varying(1024) );

2 – PSQL Function to send the notify event with a payload

CREATE OR REPLACE FUNCTION queue_event() RETURNS TRIGGER AS $$ DECLARE data json; notification json; BEGIN -- Convert the old or new row to JSON, based on the kind of action. -- Action = DELETE? -> OLD row -- Action = INSERT or UPDATE? -> NEW row IF (TG_OP = 'DELETE') THEN data = row_to_json(OLD); ELSE data = row_to_json(NEW); END IF; -- Contruct the notification as a JSON string. notification = json_build_object( 'table',TG_TABLE_NAME, 'action', TG_OP, 'data', data); -- Execute pg_notify(channel, notification) PERFORM pg_notify('q_event',notification::text); -- Result is ignored since this is an AFTER trigger RETURN NULL; END; $$ LANGUAGE plpgsql;

3 – PSQL Trigger to call the above function on a row insert

CREATE TRIGGER queue_notify_event AFTER INSERT ON dm_queue FOR EACH ROW EXECUTE PROCEDURE queue_event();

4 – Java code to listen for events

import java.sql.SQLException; import java.sql.Statement; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.impossibl.postgres.api.jdbc.PGConnection; import com.impossibl.postgres.api.jdbc.PGNotificationListener; import com.impossibl.postgres.jdbc.PGDataSource; /** * This program uses the pgjdbc_ng driver which has an asynchronous * implementation for blocking on the Postgres NOTIFY/LISTEN events. * * No polling is done using this driver. You will see a forever loop * "while(true)" in the main(). This is done to keep the program running * and listening to multiple events happening in Postgres. Normally you * would just take one event and then do something with it. * */ public class ListenNotify { // Create the queue that will be shared by the producer and consumer private BlockingQueue queue = new ArrayBlockingQueue(10); // Database connection PGConnection connection; public ListenNotify() { // Get database info from environment variables String DBHost = System.getenv("DBHost"); String DBName = System.getenv("DBName"); String DBUserName = System.getenv("DBUserName"); String DBPassword = System.getenv("DBPassword"); // Create the listener callback PGNotificationListener listener = new PGNotificationListener() { @Override public void notification(int processId, String channelName, String payload) { // Add event and payload to the queue queue.add("/channels/" + channelName + " " + payload); } }; try { // Create a data source for logging into the db PGDataSource dataSource = new PGDataSource(); dataSource.setHost(DBHost); dataSource.setPort(5432); dataSource.setDatabase(DBName); dataSource.setUser(DBUserName); dataSource.setPassword(DBPassword); // Log into the db connection = (PGConnection) dataSource.getConnection(); // add the callback listener created earlier to the connection connection.addNotificationListener(listener); // Tell Postgres to send NOTIFY q_event to our connection and listener Statement statement = connection.createStatement(); statement.execute("LISTEN q_event"); statement.close(); } catch (SQLException e) { e.printStackTrace(); } } /** * @return shared queue */ public BlockingQueue getQueue() { return queue; } /** * * main entry point * * @param args */ public static void main(String[] args) { // Create a new listener ListenNotify ln = new ListenNotify(); // Get the shared queue BlockingQueue queue = ln.getQueue(); // Loop forever pulling messages off the queue while (true) { try { // queue blocks until something is placed on it String msg = queue.take(); // Do something with the event System.out.println(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }

Testing it out

Start the Java program running. Make sure to have the environment variables set so the program can connect to the database.

Next, go into pgAdmin or psql and run:

insert into dm_queue values (6,6,'here');

You should see the following output:
/channels/q_event {“table” : “dm_queue”, “action” : “INSERT”, “data” : {“domainid”:6,”id”:6,”command”:”here”}}

The payload is formatted into JSON by the Function and contains the full row that was inserted. Events can by issued on INSERT, UPDATE, and DELETE by just modifying the Trigger’s AFTER clause.
The Java program is in an infinite loop in order to test the notifies over and over.

Leave a Reply