Sharing a Single Streaming Backend Connection Across Multiple WebSockets

Thu 23 July 2015 | -- (permalink)

There was some good discussion on Reddit about my Real Time Web Apps with (just) Python and Postgres video. User b4stien in particular gave a great suggestion:

In the "Getting Big" slide (around 14min) OP is suggesting that we need 1 postgres connection per webscocket, why can't we use a single connection for all the websockets instead?

This single connection could then notify all the open connections (websockets) to clients... Right?

This is feasible, at a cost of some complexity. I've given it a try in the ToDo app, and it seems to be working. The principles should be extendable to other kinds of backend streams that may have many frontend subscribers, so I'm writing an explanation here with the hope of helping others who face similar problems, and/or getting feedback on any subtle concurrency issues I might have missed.

Here's the basic approach:

  1. When the application starts up, create a pgpubsub connection on a background thread. (This could just as easily be some other kind of event stream, like a RabbitMQ queue, Redis pubsub, etc.)
  2. Within that background thread, maintain a list of Queue objects, one for each frontend subscriber.
  3. Each time a client makes a new WebSocket request, create a new Queue object for that client and add it to the background thread's subscriber list.
  4. Each time an event comes in on the stream, put it into each of the subscribing Queue objects.
  5. When the WebSocket disconnects, remove the corresponding Queue object from the background thread's subscriber list.

You can see this implemented in the ToDo app's new fanout module.

The tricky and/or interesting bits:

Gunicorn doesn't like early-bound pgpubsub connections.

Initially I tried setting up the background thread on App.__init__. This resulted in a puzzling disconnect from Postgres:

OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

The Postgres logs didn't shed much more light:

2015-07-23 19:38:42 UTC [23432-1] postgres@todos FATAL:  invalid frontend message type 0

I found two ways to make that problem go away:

  1. Make the pgpubsub connection when I get the first web request instead of doing it on __init__.
  2. Use Werkzeug's HTTP server instead of Gunicorn.

I suspect that the connection is being unsafely passed through a process fork when Gunicorn spawns its workers. For now I've kept Gunicorn and stuck with the first of those fixes.

Subscribing

I had a clear idea in mind of how the background thread could fan messages out to multiple WebSockets, but it wasn't immediately obvious how to let that background thread know that a new WebSocket had connected.

My solution was to have the background thread accept a Queue object called "q_in" when initially started up. Then the main thread could send subscribers' Queue objects in on it, as well as "unsubscribe" and "exit" messages.

Yo dawg, I heard you like Queues

Xzibit provides a surprisingly accurate description of our solution.

Last note on this: I'm using the standard library's threading and Queue interfaces, but in this app they're monkey-patched by Gevent. A pure Python implementation would look the same, but not support as many simultaneous WebSockets.

Iterator Interface for Subscribers

All the housekeeping of setting up, listening to, and tearing down Queue objects is hidden from the request-handling code, which is able to use a clean iterator interface:

def websocket(self):
   for event in self.app.fanout.subscribe():
       self.ws.send(event.payload)

I may extract this code into a standalone fanout library that is not pgpubsub-specific.