Streaming Updates from Postgres

Tue 13 January 2015 | -- (permalink)

This post is about how to publish data from Postgres in more-or-less real time. Understanding why I would want to do that will require telling a story...

Once Upon a Time

Our survey system began life at a company named Polimetrix in Palo Alto in the mid aughts. Polimetrix got bought by YouGov, and the survey system went from supporting a single country (the US) and a single office to supporting all of YouGov's regions and offices. The team, working in a single language (Python), with a single database (Postgres), turned into multiple teams in multiple offices working with multiple languages.

The survey system started to get broken into separate services: web frontends, a transactional database (the legacy Postgres instance), analytics databases that needed to be synced from the transactional one, a metrics aggregation service, and more. Many of those things didn't actually need to be involved in generating web pages, but they needed to be notified about what happened on them.

This easily became a mess. Any service doing something interesting was at risk of its code being littered with calls to all these other metrics/analytics services that it didn't really care about. And if we wanted to change to a new analytics DB or metrics aggregator, we had to rip out all the old calls and replace them with the new thing. That sucked.

So around a year ago, we decided to take a different approach. Applications would emit streams of events to a queue or pubsub, without caring who was subscribing to them (if anyone). We patched a few systems to work this way, with good results, but the refactoring was still slow going, as it competed with a lot of other demands for developer time.

Which brings us back to Postgres. What if instead of patching all of the services that talk to Postgres, we could bolt something onto the database itself that would capture events which we could then push to RabbitMQ? We've found a number of ways to do that, each with different tradeoffs, and some of them subtly but definitely wrong.

Option 1: Polling

Suppose you've got an append-only "points_transactions" table like this:

Column | Type | Modifiers ---------+--------------------------+------------------------------------------------------------------ id | integer | not null default nextval('points_transactions_id_seq'::regclass) user_id | integer | amount | integer | time | timestamp with time zone | default now()

If you wanted some other service to stay in sync with how many points people had, it could run a query like this once every few seconds:

SELECT * FROM points_transactions WHERE id>(the last ID that I got)

At first that looks like a pretty solid solution. It doesn't matter if your consumer has been down for days, you'll still be able to pick up where you left off (assuming the consumer can remember the last ID).

But I got a good lesson in the subtleties of MVCC when I proposed this to one of our DBAs. Imagine the following sequence of events:

  1. Service A starts a transaction.
  2. Service A does an INSERT INTO points_transactions. At this point its 'id' field is determined by Postgres by getting the next integer in the sequence.
  3. Service B starts a transaction, and does a similar INSERT, also getting an 'id' from the sequence.
  4. Service B COMMITs its transaction.
  5. The polling sync service requests new rows and remembers the ID from Service B's INSERT as the highest one it has seen.
  6. Service A COMMITs its transaction.

Now you have a problem. The next time the sync service asks for new rows, the ID that it requests with will be too high to select Service A's insert, and that row won't get synced.

If writes to the table are really low this problem might not crop up, but with any kind of load I wouldn't trust it.


Postgres has a pubsub system built in. If you want to publish a message each time a row is inserted to points_transactions, you can write a trigger like this:

``` CREATE OR REPLACE FUNCTION notify_new_points_txn() RETURNS trigger as $$ DECLARE payload text; BEGIN -- We can publish JSON payloads! payload := row_to_json( NEW )::text; IF octet_length( payload ) > 8000 THEN -- There's an 8000 byte limit to NOTIFY messages payload := ('{"id": "' || || '", "error": "too long"}')::json::text; ELSE PERFORM pg_notify( 'points_txn_insert'::text, payload ); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;

CREATE TRIGGER notify_new_points_txn AFTER INSERT on facts FOR EACH ROW EXECUTE PROCEDURE notify_new_points_txn(); ```

You could then subscribe to real-time INSERT events in another process with this:

LISTEN points_txn_insert;

The nice thing about this is that it's more real-time, and you don't have the overhead of constant polling on a table with few writes.

But it's even easier to miss an update with this than with the polling solution. If the listener app goes down, it will have no way to recover the events that it missed when it comes back up.

Option 3: Add acks

The problem of missing events in the polling and pubsub options can be addressed by making Postgres do a little more work. Suppose we add an 'unpublished_points_transactions' table:

postgres=# CREATE TABLE unpublished_points_transactions ( txn_id INTEGER REFERENCES points_transactions(id) );

Now tweak the trigger function from the last example, but add an extra line to insert each new row's ID into the unpublished_points_transactions table as it's written:

CREATE OR REPLACE FUNCTION notify_new_points_txn() RETURNS trigger as $$ DECLARE payload text; BEGIN payload := row_to_json( NEW )::text; IF octet_length( payload ) > 8000 THEN -- this won't work in a pg_notify payload := ('{"id": "' || || '", "error": "too long"}')::json::text; ELSE PERFORM pg_notify( 'points_txn_insert'::text, payload ); END IF; INSERT INTO unpublished_points_transactions (txn_id) VALUES (; RETURN NEW; END; $$ LANGUAGE plpgsql;

Now in the listener process, add two things:

  1. Whenever you hear a new transaction on the pubsub, delete that row from the unpublished table after pushing it to RabbitMQ.
  2. When the listener process starts up, and after starting to listen on the pubsub, query for any unpublished transactions, publish them, and pop them from the table.

This guarantees that each event will be published at least once. (Duplicate publication is possible!)

Option 4: Logical Replication

We have Option 3 in production today, but it's still more hassle to set up than I'd like. We have to set up triggers, ack tables, and listener processes for each table we'd like to notify about. It also only works well for append-only tables. Handling updates and deletes would make it a bit more complicated.

Postgres 9.4 has a logical replication feature that can make all the above approaches obsolete. It has crash safety built in, so you don't need to set up your own acks table. It also lets you stream changes in real time.

You turn this on by creating a "replication slot" (example here), which requires that you also tell Postgres what plugin to use to format your output. There's a "test_decoding" plugin included with Postgres, which outputs a plain text summary of each change. There's also a 3rd party decoder_raw plugin, which transforms each change into a SQL INSERT, UPDATE, or DELETE that you can replay on another Postgres server to keep it in sync.

To reproduce the payloads of the pubsub example above though, neither of those decoder plugins is going to work. So this past weekend I hacked together decoder_json, which emits a JSON document for each change to the database. After installing it, you could set it up like this:

$ pg_recvlogical --slot=test_decoder_json_streaming --dbname=postgres --user=postgres --create-slot --plugin=decoder_json $ pg_recvlogical --slot=test_decoder_json_streaming --dbname=postgres --user=postgres --start -f -

Now in a separate terminal, try inserting a points_transaction:

postgres=# INSERT INTO points_transactions (user_id, amount) VALUES (12345, 500); INSERT 0 1 postgres=# INSERT INTO points_transactions (user_id, amount) VALUES (54321, -100); INSERT 0 1

You should see output like this in the first terminal: {"table":"public.points_transactions","op":"INSERT","data":{"id":1,"user_id":12345,"amount":500,""time"":"2015-01-14T07:23:25.649838+00:00"}} {"table":"public.points_transactions","op":"INSERT","data":{"id":2,"user_id":54321,"amount":-100,""time"":"2015-01-14T07:23:41.097829+00:00"}}

(FYI: I just noticed a bug in that output.)

I'm still fiddling with some pieces, like the ability to connect to a replication slot from a Postgres driver like psycopg2 instead of having to use pg_recvlogical, but I'm excited about this. I think it'll provide an elegant way to grow from a small, monolithic app with a single-database into a service-oriented architecture where that app starts to integrate with lots of others.

One last thought because I know someone will complain about this: Just as good APIs are designed to present high level business concepts instead of just exposing the underlying storage, a good event stream would publish "points_transaction" events rather than UPDATE/INSERT/DELETE events. Between hearing the SQL event from Postgres, and emitting an event to RabbitMQ, it's a good idea to do some transformation to hide implementation details that are likely to change or that may confuse listeners.