Introducing Crunchy Data Warehouse: A next-generation Postgres-native data warehouse. Crunchy Data Warehouse Learn more

Devious SQL: Message Queuing Using Native PostgreSQL

Avatar for David Christensen

David Christensen

8 min read

An interesting question came up on the #postgresql IRC channel about how to use native PostgreSQL features to handle queuing behavior. There are existing solutions for queuing, both in PostgreSQL, with the venerable pgq project, or dedicated message queues like RabbitMQ, Kafka, etc. I wanted to explore what could be done with native Postgres primitives and I thought this warranted an entry in my Devious SQL series.

Requirements

So what makes up a minimal queuing solution? Effectively, we need the following:

  • a table to hold events or items to be processed
  • something to enqueue/put items in the table
  • something to dequeue/consume these items
  • do so without locking

We also want to ensure that any processing that happens only affects things once. We don't want more than one worker process to charge a credit card, for example, and we also want to make sure that if there is an issue when handling an item that it doesn't disappear without being handled.

So what sorts of primitives can we use in PostgreSQL in order to handle the basic requirements here?

Basics

In this design the processing of these work items happens outside the database. We can structure the design of this queue processor around the following pseudocode:

// sample queue worker pseudocode

dbh = connect_to_postgres()
while (1) {
    dbh->begin()
    rows = get_batch()
    do_something_with_rows(rows)
    delete_old_rows(rows)
    dbh->commit()
}

Work storage for queuing is trivial is PostgreSQL; we just use a basic table with the items to be processed. Since we want data to be persistent (i.e., don't want to lose our work jobs if the server crashed in the middle of processing) we will want to use a normal table instead of an UNLOGGED table or other sorts of optimization.

Adding new tasks to the queue will be accomplished via just straight INSERT statements. Once we handle the external processing, we will remove the rows using DELETE statements. These transactional semantics are easy enough to adapt to our desired format.

Here is an example simple schema for our table:

CREATE TABLE queue_table (
    id int not null primary key generated always as identity,
    queue_time	timestamptz default now(),
    payload	text
);
INSERT INTO queue_table (payload) SELECT 'data-' || text(generate_series(1,1000));
CREATE TABLE
INSERT 0 1000

Trickier stuff

Say we want to process batches of 10 with worker processes, but we want to ensure that we don't double-process items. The naïve approach here is to do a SELECT * FROM queue_table LIMIT 10. However, this could result in multiple workers getting the same list of items to process. This is against our requirements per the definition, and in practice would be bad. (Some systems can be designed to deal with message repeats, but by definition, this is not one of them.)

So what to do?

Row locks to the rescue

Using row locks (a la SELECT FOR UPDATE), we can lock the rows that we are handling in a particular backend. Our query then looks like:

SELECT * FROM queue_table LIMIT 10 FOR UPDATE;

Let's test it out in two backends. Since row locks only last for the duration of a transaction, we will need to run this query inside each backend in a concurrent transaction.

-- backend 1
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE;
BEGIN
id	queue_time	payload
1	2021-08-31 10:36:34.872794-05	data-1
2	2021-08-31 10:36:34.872794-05	data-2
3	2021-08-31 10:36:34.872794-05	data-3
4	2021-08-31 10:36:34.872794-05	data-4
5	2021-08-31 10:36:34.872794-05	data-5
6	2021-08-31 10:36:34.872794-05	data-6
7	2021-08-31 10:36:34.872794-05	data-7
8	2021-08-31 10:36:34.872794-05	data-8
9	2021-08-31 10:36:34.872794-05	data-9
10	2021-08-31 10:36:34.872794-05	data-10
-- backend 2
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE;
BEGIN
<hangs>

SKIP LOCKED

As you can see, the second backend hangs. Fortunately, PostgreSQL supports skipping already locked rows and can return the "next" batch of rows in the queue table. Let's adjust our query as follows:

SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED;

And our subsequent test shows us selecting the concurrent batches that do not overlap:

-- backend 1
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED;
BEGIN
id	queue_time	payload
1	2021-08-31 10:36:34.872794-05	data-1
2	2021-08-31 10:36:34.872794-05	data-2
3	2021-08-31 10:36:34.872794-05	data-3
4	2021-08-31 10:36:34.872794-05	data-4
5	2021-08-31 10:36:34.872794-05	data-5
6	2021-08-31 10:36:34.872794-05	data-6
7	2021-08-31 10:36:34.872794-05	data-7
8	2021-08-31 10:36:34.872794-05	data-8
9	2021-08-31 10:36:34.872794-05	data-9
10	2021-08-31 10:36:34.872794-05	data-10
-- backend 2
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED;
id	queue_time	payload
11	2021-08-31 10:36:34.872794-05	data-11
12	2021-08-31 10:36:34.872794-05	data-12
13	2021-08-31 10:36:34.872794-05	data-13
14	2021-08-31 10:36:34.872794-05	data-14
15	2021-08-31 10:36:34.872794-05	data-15
16	2021-08-31 10:36:34.872794-05	data-16
17	2021-08-31 10:36:34.872794-05	data-17
18	2021-08-31 10:36:34.872794-05	data-18
19	2021-08-31 10:36:34.872794-05	data-19
20	2021-08-31 10:36:34.872794-05	data-20

Next steps

So now we have a solution that works to SELECT individual batches of rows and hand them out to multiple backends. So how to we deal with processing the rows and marking them done?

Let's review our pseudocode:

// sample queue worker pseudocode

dbh = connect_to_postgres()
while (1) {
    dbh->begin()
    rows = get_batch()
    do_something_with_rows(rows)
    delete_old_rows(rows)
    dbh->commit()
}

As you can see from this illustration, we have implemented the get_batch() functionality. The do_something_with_rows() routine would be application-specific handling/processing of the underlying work item. But how do we delete the rows in question?

Cleanup

If our table has a Primary Key (which good database design dictates that you should always have), then we could use it to DELETE the rows we just selected from the batch. However, there are some potential tradeoffs/issues with this approach:

  1. Statements would be built up based on returned data. If you were doing DELETE FROM queue_table WHERE id = ? statement for each row, this would become more and more inefficient if you change the batch size from 10 to 100, for example.
  2. Issuing individual DELETE statements isn't good, but if you try to improve this by using batch queries you would need to consider the number of rows returned, not just the basic batch size. If you expected to process 10 rows each time, but only had 3 returned, you would need to prepare a DELETE statement that deleted only 3 records with associated IDs; that, or use a language which supported array bindings and pass the ids in that way.

Either way, we are increasing the complexity when we don't need to; we already know which rows we want to delete: they are the exact ones that we previously SELECT-ed.

There's got to be a better way!

Hmm, let's think. The DELETE statement is able to return data to us as if it were a SELECT statement via the RETURNING clause, however DELETE lacks a LIMIT clause, nor can we take row locks on it explicitly. That said, we can use the USING clause to join to itself and get both limits and row locks.

This gives us a final query as follows:

DELETE FROM
    queue_table
USING (
    SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED
) q
WHERE q.id = queue_table.id RETURNING queue_table.*;

We are using the self-join method of deletion here with a subquery to both lock the underlying rows being returned, as well as deleting the rows and returning all values for the set of rows as if this were an original SELECT statement.

-- backend 1
BEGIN;
DELETE FROM
    queue_table
USING (
    SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED
) q
WHERE q.id = queue_table.id RETURNING queue_table.*;
BEGIN
id	queue_time	payload
1	2021-08-31 10:36:34.872794-05	data-1
2	2021-08-31 10:36:34.872794-05	data-2
3	2021-08-31 10:36:34.872794-05	data-3
4	2021-08-31 10:36:34.872794-05	data-4
5	2021-08-31 10:36:34.872794-05	data-5
6	2021-08-31 10:36:34.872794-05	data-6
7	2021-08-31 10:36:34.872794-05	data-7
8	2021-08-31 10:36:34.872794-05	data-8
9	2021-08-31 10:36:34.872794-05	data-9
10	2021-08-31 10:36:34.872794-05	data-10
DELETE 10
COMMIT
-- backend 2
BEGIN;
DELETE FROM
    queue_table
USING (
    SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED
) q
WHERE q.id = queue_table.id RETURNING queue_table.*;
BEGIN
id	queue_time	payload
11	2021-08-31 10:36:34.872794-05	data-11
12	2021-08-31 10:36:34.872794-05	data-12
13	2021-08-31 10:36:34.872794-05	data-13
14	2021-08-31 10:36:34.872794-05	data-14
15	2021-08-31 10:36:34.872794-05	data-15
16	2021-08-31 10:36:34.872794-05	data-16
17	2021-08-31 10:36:34.872794-05	data-17
18	2021-08-31 10:36:34.872794-05	data-18
19	2021-08-31 10:36:34.872794-05	data-19
20	2021-08-31 10:36:34.872794-05	data-20
DELETE 10

As long as the transaction stays open until we are done with the processing of the work items, other similar worker processes will not get assigned those work items, and will instead retrieve ones which are not currently locked. If this worker process is not able to handle its batch of items, it merely needs to ROLLBACK (or to have the application abort) and the original work_items will be returned to the queue. As long as the worker obeys the BEGIN, DELETE RETURNING, <process>, COMMIT process, you have a performant, non-blocking, independent queue manager built using only native PostgreSQL features.

Additionally, since we are using DELETE, we know that autovacuum should kick in and clean this table up periodically. If needed, we could tune the storage parameters for this queue table to ensure that the database itself handles this sort of cleanup.

Caveats

One of the considerations when using PostgreSQL (or any RDBMS with MVCC support) for high turnover queuing is table bloat. If you were using this technique in a production setting, you would need to monitor table bloat using a tool such as pg_bloat_check (a part of pgmonitor) and then tune autovacuum settings appropriately. Even then, you may need to occasionally rotate your queue table in order to deal with table bloat.

Future improvements

Since many queue systems have support for than a basic FIFO system you could enhance our system here by adding fields to our basic queue_table, such as topics, priorities, etc. The same basic recipe would be the same, but we would not need to change much about the worker itself other than the sub-SELECT used to identify the rows in question. Everything else about the system could be the same.

This is a powerful model that PostgreSQL itself handles with basic primitives. While special-case tools often have their niche, it is interesting to see what we can accomplish using the database itself.