Go-ing from PostgreSQL rows to RabbitMQ messages

How do you turn events in a database into messages in a message queue? I was pondering this question recently and decide to answer the question using PostgreSQL and RabbitMQ. The hypothetical use case would be when URL records are inserted into a table, representing a request for work to be done, we would want the URL to be sent into a message queue so it can be picked up by a worker application.

Prepare the table and triggers

Let's start with our work source, a table called urls, which we can create as so:

CREATE TABLE urls (id serial primary key, url varchar);  

So now, how can we pick up those inserts as they happen? An old school way to do this would be to poll the table where the URLs are being inserted into our urls table. It's hardly real-time, generating a granularity in the work requests and it would take some management to ensure you weren't double sending or missing updates.

A more effective way to pick up the fact that records are being inserted is to use a TRIGGER. This is a standard feature of SQL. You can create a trigger which executes something when there's an insert, update or delete. In our case we want to spot when something is being inserted and call a function saved on the server. Here's some SQL to do just that:

CREATE TRIGGER urlbefore BEFORE INSERT ON urls  
    FOR EACH ROW EXECUTE PROCEDURE notify_trigger();

We are creating a trigger and naming it urlbefore. The next thing we have to ask ourselves is do we want to act on an event before, after or even instead of when it is committed to the database. The events in PostgreSQL are INSERT, UPDATE, DELETE and TRUNCATE you can pick up just one event, or any one of an "OR"ed set. We're not too worried about those other events here so we'll stick with BEFORE INSERT ON and then we name the table where these events are expected to occur.

Next, we specify FOR EACH ROW so that each new row inserted will fire the trigger's payload, so every new row added to the table will fire. If we didn't say that, then the default is FOR EACH STATEMENT which would mean bulk insertions in one SQL statement would only fire the trigger payload once – not what we want at all.

Finally, the payload is a PostgreSQL function... EXECUTE PROCEDURE notify_trigger() which means we now have to create a function which will do our notification.

A Time to NOTIFY

We'll use PL/SQL because its built-in and has everything we need. Here we go:

CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $$  
BEGIN  
    PERFORM pg_notify('urlwork', row_to_json(NEW)::text);
    RETURN NEW;
END;  
$$ LANGUAGE plpgsql;

Let's start by looking at the trigger declaration first. That's this section:

CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $$  
........
$$ LANGUAGE plpgsql;

We're making a function so we CREATE OR REPLACE it to ensure if we've previously created a function of the same name, we overwrite it. If you are making production code, never ever do this, only do it for examples - you don't know in production if your CREATE OR REPLACE could end up replacing core business logic with "Hello World".

Next, we name our function 'notify_trigger()' and then say what it is going to return. As a trigger function, it returns the trigger type which can be either NULL or the row that fired the trigger. We'll come to that later. This is followed by AS $$ where the $$ denotes the start of our code block. What kind of code? Well, if we skip past it to the end where the next $$ denotes the end of the code block, we can see its followed by LANGUAGE plpgsql; so this is going to be PL/pgSQL code. Let's have a look at that code now:

BEGIN  
    PERFORM pg_notify('urlwork', row_to_json(NEW)::text);
    RETURN NEW;
END;  

The BEGIN and END should be obvious in their function, though the former can be preceded by a DECLARE block for defining variables. Let's move to the actual working statements within.

We're now at the point where we can create a notification message for listening applications. PostgreSQL lets you create channels which you can send messages down. Applications can listen to those channels and be notified when new messages appear. There's some restrictions in terms on what you can send; a maximum of 8000 bytes in the payload and a 8GB queue by default. Assuming you aren't going LISTEN/NOTIFY crazy that is good enough for many applications.

If you look at the PostgreSQL syntax there are LISTEN and NOTIFY statements. They aren't standard SQL but are unique to PostgreSQL. They also aren't the easiest things to work with and, with that in mind, developers wrapped NOTIFY in a pg_notify() function which takes the channel name and a payload for the message. The next question is what to put in the payload...

When you are creating a trigger function, there's a whole set of variables also passed to the function. How you access them depends on your language. We're using PL/pgSQL and for that, there's a page on the special variables available. First on the list is NEW which will contain the row being inserted. For this exercise, that's all we'll need, but take a moment to browse the other variables.

So, we have the row data and we want to turn it into some useful and easily parsed format. Stand down comma delimited fields, PostgreSQL does JSON and more specifically has a function row_to_json() which will convert a row into a JSON string using the column names as JSON keys. The result of row_to_json() is a json type field, so we need to convert it into text before we send it by casting it - ::text. Put that all together and you get:

PERFORM pg_notify('urlwork', row_to_json(NEW)::text);  
RETURN NEW;  

We use the PERFORM command because we aren't interested in any results. We end our functions with RETURN NEW to send back the unchanged row for committing to the database - and yes, that means the notification will be queued up before the write but, if that becomes an issue, we can always switch to AFTER INSERT.

A moment to Go

It's about now that we should talk about the client program, our Go application. We've created a small example app called postrabbit. Once built it takes its configuration from a prcreds.yaml file with URLs for your PostgreSQL and RabbitMQ systems. If you run postrabbit --help you will see there are three commands, setup, run and add - there's also help but we don't need to explain that.

The setup command does all the setup commands we've seen. The add command takes a URL as a parameter and adds it to our urls table. If we have a quick look at that (less the error checking you'll find in the actual go file) you will find it simply opening and writing to the database through the Go SQL library:

package main

import (  
    "database/sql"
    "log"
)

func add(config Config) {  
    db, err := sql.Open("postgres", config.PostgresURL)
    _, err = db.Exec(`INSERT INTO urls(url) VALUES($1)`, urlarg)
    err = db.Close()
}

The interesting stuff is all in the run command where we actually listen for the notifications.

And a time to LISTEN

We'll step through the run command here, from the beginning:

package main

import (  
    "crypto/tls"
    "log"
    "time"

    pq "github.com/lib/pq"
    "github.com/streadway/amqp"
)

The first thing to note here is that we are being explicit about importing "github.com/lib/pq". Most client applications will, as the documentation says, go through the database/sql package, but we need to import it here so we can make use of a particular, unique type, the Listener.

func errorReporter(ev pq.ListenerEventType, err error) {  
    if err != nil {
        log.Print(err)
    }
}

The errorReporter is a function that works to support the Listener type by handling errors emitted by it. Here we just print the errors if, that is, there is an error to report. Let us move on into the heart of the code:

func run(config Config) {  
    listener := pq.NewListener(config.PostgresURL, 10*time.Second, time.Minute, errorReporter)
    err := listener.Listen("urlwork")
    if err != nil {
        log.Fatal(err)
    }

That was it, the entire code for configuring a listener. We create a NewListener using the pq package and hand it a PostgreSQL URL, along with reconnect intervals and maximums and an error reporting function. The Listener will manage reconnections itself for the most part. This gives us our listener which we can call Listen() on to follow a notification channel. Back up in our SQL code, you'll recall we created the channel "urlwork" so that's what we'll listen to.

Rabbiting

Next in our example code, we create a go-routine to handle all of the interaction with Rabbit. We're going to use a Go channel of strings to send requests to Rabbit, so let's create that now:

    rabbitchannel := make(chan string, 100)

Next, we begin to define a go-routine. The first thing this routine has to do is open a connection with the RabbitMQ system.

    go func() {
        cfg := new(tls.Config)
        cfg.InsecureSkipVerify = true
        conn, err := amqp.DialTLS(config.RabbitMQURL, cfg)
        defer conn.Close()

This is a shorter version of what we covered in a previous RabbitMQ article - shorter because we're not verifying the server certificate. Next we open a channel to the RabbitMQ server:

        ch, err := conn.Channel()
        defer ch.Close()

Now we can loop forever... first reading, or wait to read, the next item off the rabbitchannel, then processing it as we need. It'll be JSON data so we could unmarshal it and make use of the various fields. In this case though, we'll just print it, then publish it to a RabbitMQ "urlwork" exchange as plain text:

        for {
            payload := <-rabbitchannel
            log.Println(payload)
            err := ch.Publish("urlwork", "todo", false, false, amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(payload),
            })
        }
    }()

And once that's defined it will start running. In the normal flow of things it'll open the RabbitMQ connection and then block waiting from something on the rabbit channel.

Now we need to fill that rabbitchannel so we'll go into another loop. This time we'll use Go's select...case which lets us wait on multiple communications channels. In the first case we'll wait for something to come from the listener; that'll be a JSON message which we extract from the notification and send on into the rabbitchannel. The second case we just wait for a 90 second timer to expire before pinging the listener. If we get an error from the ping, we'll bail out there as something is up with the connection.

    for {
        select {
        case notification := <-listener.Notify:
            rabbitchannel <- notification.Extra
        case <-time.After(90 * time.Second):
            go func() {
                err := listener.Ping()
                if err != nil {
                    log.Fatal(err)
                }
            }()
        }
    }

And that's it. That loop will read the PostgreSQL Notifier and put the results into the Go channel for RabbitMQ where the other loop will pick it up and post it on to a selected RabbitMQ exchange. What's probably more interesting is how much of the actual work is done within PostgreSQL; the insert detection, message queueing and conversion to JSON all happen there leaving our Go code to pick up the message and pass it on.

And Running

You can download the postrabbit code from Github and build it. You'll need to set URLs for the PostgreSQL and RabbitMQ systems in the prcreds.yaml file. Once you've done that you can then:

$ postrabbit setup
$ postrabbit run

in one session and in another do...

$ postrabbit add https://compose.com/articles

The first session should show you the JSON being picked up and sent on like this:

2015/11/17 14:34:05 {"id":10,"url":"https://compose.com/articles"}  

Before we did that though, we went and bound a queue to the AMQP default queue in RabbitMQ to pull messages from urlwork, with a routing key of todo. That lets us examine what arrives in RabbitMQ and sure enough:

There's our message ready for consuming... The connection is made.

Hopefully, taking on this problem will have shed light on one of PostgreSQL's NOTIFY/LISTEN functionality and the power of the SQL database to be more than just a passive store that responds to queries. It also should show how natural Go's go routines are for concurrency.