Incremental Archival from Postgres to Parquet for Analytics
Marco Slot
7 min readMore by this author
PostgreSQL is commonly used to store event data coming from various kinds of devices. The data often arrives as individual events or small batches, which requires an operational database to capture. Features like time partitioning help optimize the storage layout for time range filtering and efficient deletion of old data.
The PostgreSQL feature set gives you a lot of flexibility for handling a variety of IoT scenarios, but there are certain scenarios for it is less suitable, namely:
- Long-term archival of historical data
- Fast, interactive analytics on the source data
Ideally, data would get automatically archived in cheap storage, in a format optimized for large analytical queries.
We developed two open source Postgres extensions that help you do that:
- pg_parquet can export (and import) query results to the Parquet file format in object storage using regular COPY commands
- pg_incremental can run a command for a never-ending series of time intervals or files, built on top of pg_cron
With some simple commands, you can set up a reliable, fully automated pipeline to export time ranges to the columnar Parquet format in S3.
Then, you can use a variety of analytics tools to query or import the data. My favorite is of course Crunchy Data Warehouse.
Exporting event data periodically to Parquet in S3
On any PostgreSQL server that has the pg_parquet and pg_incremental extensions, you can set up a pipeline that periodically exports data to in S3 in two steps.
The pg_incremental extension has a create_time_interval_pipeline function that will run a given command once the time interval has passed, with 2 timestamp parameters set to the start and end of the hour. We cannot directly use query parameters in a COPY command, but we can define a simple PL/pgSQL function that generates and executes a custom COPY command using the parameters.
-- existing raw data table
create table events (
event_id bigint not null generated by default as identity,
event_time timestamptz not null default now(),
device_id bigint not null,
sensor_1 double precision
);
insert into events (device_id, sensor_1)
values (297, 20.4);
insert into events (device_id, sensor_1)
values (297, 20.4);
-- define an export function that wraps a COPY command
create or replace function export_events(start_time timestamptz, end_time timestamptz)
returns void language plpgsql as $function$
begin
execute format(
$$
copy (select * from events where event_time >= %L and event_time < %L)
to 's3://mybucket/events/%s.parquet' with (format 'parquet');
$$,
start_time, end_time, to_char(start_time, 'YYYY-MM-DD-HH')
);
end;
$function$;
-- export events hourly from the start of the year, and keep exporting in the future
select incremental.create_time_interval_pipeline('event-export',
time_interval := '1 hour', /* export data by the hour */
batched := false, /* process 1 hour at a time */
start_time := '2025-01-01', /* backfill from the start of the year */
source_table_name := 'events', /* wait for writes on events to finish */
command := $$ select export_events($1, $2) $$ /* run export_events with start/end times */
);
By running these commands, Postgres will export all the data from the start of the year into hourly Parquet files in S3, and will keep doing so after every hour and automatically retry on failure.
To use pg_parquet Crunchy Bridge, you can add your S3 credentials for pg_parquet to your Postgres server via the dashboard under Settings -> Data lake.
Analytics on Parquet in S3
Once data is in Parquet, you can use a variety of tools and approaches to query the data. If you want to keep using Postgres, you can use Crunchy Data Warehouse which has two different ways of working with Parquet data.
Querying Parquet directly using a lake analytics table
The simplest way to start querying Parquet files in S3 in Crunchy Data Warehouse is to use a lake analytics table. You can easily create a table for all Parquet files that match a wildcard pattern:
create foreign table events_parquet ()
server crunchy_lake_analytics
options (path 's3://mybucket/events/*.parquet');
You can then immediately query the data and the files get cached in the background, so queries will quickly get faster.
A downside of querying Parquet directly is that eventually we will have a lot of hourly files that match the pattern, and there will be some overhead from listing them for each query (listing is not cached). We also cannot easily change the schema later.
Automatically importing files into an Iceberg table
A more flexible approach is to import the Parquet files into an Iceberg table. Iceberg tables are also backed by Parquet in S3, but the files are compacted and optimized, and the table supports transactions and schema changes.
You can create an Iceberg table that has the same schema as a set of Parquet files using the definition_from option. You could also load the data using load_from, but we’ll do that separately.
create table events_iceberg () using iceberg
with (definition_from = 's3://mybucket/events/*.parquet');
Now we need a way to import all existing Parquet files and also import new files that show up in S3 into Iceberg. This is another job for pg_incremental. Following a similar approach as before, we create a function to generate a COPY command using a parameter.
-- define an import function that wraps a COPY command to import from a URL
create function import_events(path text)
returns void language plpgsql as $function$
begin
execute format($$copy events_iceberg from %L$$, path);
end;
$function$;
-- create a pipeline to import new files into a table, one by one.
-- $1 will be set to the path of a new file
select incremental.create_file_list_pipeline('event-import',
file_pattern := 's3://mybucket/events/*.parquet',
list_function := 'crunchy_lake.list_files',
command := $$ select import_events($1) $$,
);
-- optional: do compaction immediately
vacuum events_iceberg;
After running these commands, your data will be continuously archived from your source Postgres server into Iceberg in S3. You can then run fast analytical queries directly from Crunchy Data Warehouse, which uses a combination of parallel, vectorized query processing and file caching to speed up queries. You can additionally set up (materialized) views and assign read permissions to the relevant users.
No complex ETL pipelines required.
Analytics performance comparison
To give you a sense of the performance benefit of using Parquet, we loaded 100M rows into the source table, which got automatically mirrored in Parquet and Iceberg via our pipelines. We then ran a simple analytical query on each table:
select device_id, avg(sensor_1) from events group by 1;
The runtimes in milliseconds are shown in the following chart:
In this case the source server is a standard-16 instance (4 vcpus) on Crunchy Bridge, and the warehouse is a warehouse-standard-16 instance (4 vcpus). So, using Crunchy Data Warehouse we can analyze 100M rows in well under a second on a small machine, and get >10x speedup with Iceberg.
The use of compression also means the size went from 8.9GB in PostgreSQL to 1.2GB in Iceberg using object storage.
The simplicity of transactional ETL
With pg_parquet and pg_incremental, you can incrementally export data from PostgreSQL into Parquet in S3, and with Crunchy Data Warehouse you can process and analyze that data very quickly while still using PostgreSQL.
One of the nice characteristics of the approach described in this blog is that the pipelines are fully transactional. It means that every import or export step either fully succeeds or fails and then it will be retried until it does succeed. That’s how we can create production-ready pipelines with a few simple SQL commands.
Under the covers, pg_incremental keeps track of which time ranges or files have been processed. The bookkeeping happens in the same transaction as the COPY commands. So if a command fails because of an ephemeral S3 issue, the data will not end up being ingested twice or go missing. Having transactions takes away a huge amount of complexity for building reliable pipelines. There can of course be other reasons for pipeline failures that cannot be resolved through retries (e.g. changing data format), so it is still important to monitor your pipelines.