pg_incremental: Incremental Data Processing in Postgres
Today I’m excited to introduce pg_incremental, a new open source PostgreSQL extension for automated, incremental, reliable batch processing. This extension helps you create processing pipelines for append-only streams of data, such as IoT / time series / event data workloads.
Notable pg_incremental use cases include:
- Creation and incremental maintenance of rollups, aggregations, and interval aggregations
- Incremental data transformations
- Periodic imports or export of new data using standard SQL
After you set up a pg_incremental pipeline, it runs forever until you tell Postgres to stop. There’s a lot you can do with pg_incremental and we have a lot of thoughts on why we think it’s valuable. To help you navigate some of if you want to jump directly to one of the examples that you feel is relevant to you:
- Why incremental processing
- Practical data pipelines using parameterized SQL
- Example 1: Unpacking raw JSON data with a sequence pipeline
- Example 2: Complex aggregations with a time interval pipeline
- Example 3: Periodic export to Parquet in S3 with a time interval pipeline
- Example 4: Import new files with a file list pipeline
Why incremental processing?
My team has been working on handling data-intensive workloads in PostgreSQL for many years. The most data-intensive workloads are usually the ones with a machine-generated stream of event data, and we often find that the best solution for handling those workloads in PostgreSQL involves incremental data processing.
For example, a common pattern in PostgreSQL is to periodically pre-aggregate incoming event data into a summary table. In that model, writes (esp. batch loads) are fast because they do not trigger any immediate processing. The incremental aggregation is fast because it only processes new rows, and queries from dashboards are fast because they hit an indexed summary table. I originally developed pg_cron for this purpose, but creating an end-to-end pipeline still required a lot of bookkeeping and careful concurrency considerations.
There are some existing solutions to this problem, such as incremental materialized views and logical decoding-based approaches, but the implementations are complex and come with many limitations. Moreover, there are other incremental processing scenarios such as collecting data from multiple sources, or periodic import/export. I also still hear from people about an old blog post I wrote on incremental data processing in PostgreSQL so I know this topic remains unsolved for many Postgres users.
I felt it was time for a new incremental processing tool. One that isn't particularly magical - but is simple, versatile and gets the job done. That tool is pg_incremental.
Practical data pipelines using parameterized SQL
The basic idea behind pg_incremental is simple: You define a pipeline using a SQL command that is executed with parameters ($1, $2) that specify a range of values to be processed.
When you first define the pipeline, it executes the command with a range that covers all existing data, and also sets up a background job using pg_cron to periodically execute the command for new ranges. Every execution of the pipeline is transactional, such that each value is processed successfully exactly once. The dimension used to identify new data can be a sequence, time, or list of files.
Let’s think about a sample data aggregation pipeline:
- You have an indexed raw data table of events
- You have a summary table called view_counts that summarizes data from your daily events table
- pg_incremental is used for incrementally upserting existing and new event data into view_counts
Sample code:
/* define the raw data and summary table */
create table events (event_id bigserial, event_time timestamptz, user_id bigint, response_time double precision);
create table view_counts (day timestamptz, user_id bigint, count bigint, primary key (day, user_id));
/* enable fast range scans on the sequence column */
create index on events using brin (event_id);
/* for demo: generate some random data */
insert into events (event_time, user_id, response_time)
select now(), random() * 100, random() from generate_series(1,1000000) s;
/* define a sequence pipeline that periodically upserts view counts */
select incremental.create_sequence_pipeline('view-count-pipeline', 'events',
$$
insert into view_counts
select date_trunc('day', event_time), user_id, count(*)
from events where event_id between $1 and $2
group by 1, 2
on conflict (day, user_id) do update set count = view_counts.count + EXCLUDED.count;
$$
);
/* get the most active users of today */
select user_id, sum(count) from view_counts where day = now()::date group by 1 order by 2 desc limit 3;
┌─────────┬───────┐
│ user_id │ sum │
├─────────┼───────┤
│ 32 │ 20486 │
│ 77 │ 20404 │
│ 75 │ 20378 │
└─────────┴───────┘
A “sequence pipeline” takes advantage of sequence values in PostgreSQL being monotonically increasing with every insert. It is not normally safe to just start processing a range of sequence values, because there might be ongoing transactions that are about to insert lower sequence values. However, pg_incremental waits for those transactions to complete before processing a range, which guarantees that the range is safe.
Not every table has a sequence, and sometimes the source is not a table at all. Therefore, pg_incremental has 3 types of pipelines:
- Sequence pipelines can process ranges of new sequence values in small batches with upserts.
- Time intervals pipelines can process data that falls within a time interval after the time interval has passed.
- File list pipelines (in preview) can process new files that appear in a directory.
Let's look at some more examples:
Example 1: Unpacking raw JSON data with a sequence pipeline
PostgreSQL has great JSON support, but I often run into scenarios where you need to unpack raw JSON data into the columns of a table to simplify querying or add indexes and constraints.
Below is an example of using a pg_incremental sequence pipeline to transform raw JSON. We create a table with a sequence and a JSONB column to load raw files directly using COPY. We then set up a pipeline that extracts relevant values from the new JSON objects, and inserts them into an events table with columns.
/* create a table with a single JSONB column and a sequence to track new objects */
create table events_json (id bigint generated by default as identity, payload jsonb);
create index on events_json using brin (id);
/* load some data from a local newline-delimited JSON file */
\copy events_json (payload) from '2024-12-15-00.json' with (format 'csv', quote e'\x01', delimiter e'\x02', escape e'\x01')
/* periodically unpack the new JSON objects into the events table */
select incremental.create_sequence_pipeline('unpack-json-pipeline', 'events_json',
$$
insert into events (event_id, event_time, user_id, response_time)
select
nextval('events_event_id_seq'),
(payload->>'created_at')::timestamptz,
(payload->'actor'->>'id')::bigint,
(payload->>'response_time')::double precision
from events_json
where id between $1 and $2
$$
);
After setting up the pipeline, future data loads into events_json will automatically be transformed and added to the events table.
Example 2: Complex aggregations with a time interval pipeline
A time interval pipeline runs after an interval has passed when all the data in the interval is available. Compared to sequence pipelines, time interval pipelines are more suitable for aggregations that cannot be merged such as exact distinct counts.
Below is an example of using a pg_incremental time interval pipeline to aggregate the number of unique users in an hour into a user_counts table. The $1 and $2 parameters will be set to the start and end (exclusive) of a range of time intervals.
/* create a table for number of active users per hour */
create table user_counts (hour timestamptz, user_count bigint, primary key (hour));
/* enable fast range scans on the event_time column */
create index on events using brin (event_time);
/* aggregates a range of 1 hour intervals after an hour has passed */
select incremental.create_time_interval_pipeline('distinct-user-count', '1 hour',
$$
insert into view_counts
select date_trunc('hour', event_time), count(distinct user_id)
from events where event_time >= $1 and event_time < $2
group by 1
$$
);
/* get number of active users per hour */
select hour, user_count from user_counts order by 1;
A downside of time interval pipelines is that they do not process data with older timestamps if the corresponding interval has already been processed. By default, a time interval pipeline waits for 1 minute after the interval. You can configure a higher min_delay and can also specify a source_table_name to wait for writers to finish.
Example 3: Periodic export to Parquet in S3 with a time interval pipeline
A common requirement with event data is to export into a remote storage system like S3, for instance using the pg_parquet extension.
Below is an example of using a pg_incremental time interval pipeline to export the data in the events table to one Parquet file per day starting at Jan 1st 2024, and automatically after a day has passed.
/* define a function that wraps a COPY TO command to export data */
create or replace function export_events(start_time timestamptz, end_time timestamptz)
returns void language plpgsql as $function$ begin
/* select all rows in a time range and export them to a Parquet file */
execute format(
'copy (select * from events where event_time >= %L and event_time < %L) to %L',
start_time, end_time, format('s3://mybucket/events/%s.parquet', start_time::date)
);
end; $function$;
/* export data as 1 file per day, starting at Jan 1st */
select incremental.create_time_interval_pipeline(
'export-events',
'1 day',
'select export_events($1, $2)',
source_table_name := 'events', /* wait for writes on events to finish */
batched := false, /* separate execution for each day */
start_time := '2024-01-01' /* export all days from Jan 1st now */
);
In this case, I disabled “batching” of time intervals, such that time intervals are processed one at a time, starting from Jan 1st 2024. I also specified a source_table_name, which means the execution waits for any ongoing writes. If the event_time is generated via now(), this helps ensure we do not skip any rows.
Example 4: Import new files with a file list pipeline
One of the things that triggered me to write pg_incremental was that I found myself writing a script to incrementally process new files in S3 for a Crunchy Data Warehouse use case, and I realized that processing new files in a directory had a lot in common with the other incremental processing scenarios, except we find new data by listing files.
Below is an example of using a pg_incremental file list pipeline to import all files that match a wildcard and automatically load new files as they appear (in Crunchy Data Warehouse). The $1 parameter will be set to the path of a file that has not been processed yet, as returned by the underlying list function.
/* define function that wraps a COPY FROM command to import data */
create or replace function import_events(path text)
returns void language plpgsql as $function$ begin
/* load a file into the events table */
execute format('copy events from %L', path);
end; $function$;
/* load all the files under a prefix, and automatically load new files, one at a time */
select incremental.create_file_list_pipeline(
'import-events',
's3://mybucket/events/*.csv',
'select import_events($1)'
);
The list function is configurable via the list_function
argument. For instance, you could wrap around the pg_ls_dir() function to load files on the server, or use a function that returns a synthetic range to load public (not listable) data.
The API of the file list pipeline might still undergo small changes, hence it’s in preview.
Monitoring pg_incremental
You can see all your pipelines in the incremental.pipelines
table and monitor the progress of your pipelines via the tables that pg_incremental uses to do its own bookkeeping, which contain the last processed value:
select * from incremental.sequence_pipelines ;
┌─────────────────────┬────────────────────────────┬────────────────────────────────┐
│ pipeline_name │ sequence_name │ last_processed_sequence_number │
├─────────────────────┼────────────────────────────┼────────────────────────────────┤
│ view-count-pipeline │ public.events_event_id_seq │ 3000000 │
└─────────────────────┴────────────────────────────┴────────────────────────────────┘
select * from incremental.time_interval_pipelines;
┌───────────────┬───────────────┬─────────┬───────────┬────────────────────────┐
│ pipeline_name │ time_interval │ batched │ min_delay │ last_processed_time │
├───────────────┼───────────────┼─────────┼───────────┼────────────────────────┤
│ export-events │ 1 day │ f │ 00:00:30 │ 2024-12-17 00:00:00+01 │
└───────────────┴───────────────┴─────────┴───────────┴────────────────────────┘
In addition, you can view the result of the underlying pg_cron jobs via the regular pg_cron tables.
select jobname, start_time, status, return_message
from cron.job_run_details join cron.job using (jobid)
where jobname like 'pipeline:event-import%' order by 1 desc limit 3;
┌───────────────────────┬───────────────────────────────┬───────────┬────────────────┐
│ jobname │ start_time │ status │ return_message │
├───────────────────────┼───────────────────────────────┼───────────┼────────────────┤
│ pipeline:event-import │ 2024-12-17 13:27:00.090057+01 │ succeeded │ CALL │
│ pipeline:event-import │ 2024-12-17 13:26:00.055813+01 │ succeeded │ CALL │
│ pipeline:event-import │ 2024-12-17 13:25:00.086688+01 │ succeeded │ CALL │
└───────────────────────┴───────────────────────────────┴───────────┴────────────────┘
Note that the jobs run more frequently than the pipeline command is executed. The command is skipped if there is no new work to do.
Get started with incremental processing in PostgreSQL
Crunchy Data is proud to release pg_incremental under the PostgreSQL license. We believe it is a foundational building block for building IoT applications on PostgreSQL that should be available to everyone, similar to pg_cron, pg_parquet, and pg_partman.
You can find code and documentation on the pg_incremental GitHub repo, and let us know if you have any feedback (always appreciate a star!).
Starting today, pg_incremental is also available on Crunchy Bridge and Crunchy Data Warehouse.
Related Articles
- Postgres Tuning & Performance for Analytics Data
19 min read
- Running an Async Web Query Queue with Procedures and pg_cron
6 min read
- Name Collision of the Year: Vector
9 min read
- Sidecar Service Meshes with Crunchy Postgres for Kubernetes
12 min read
- pg_incremental: Incremental Data Processing in Postgres
11 min read