Syncing Postgres Partitions to Your Data Lake in Crunchy Bridge for Analytics
One of the unique characteristics of the recently launched Crunchy Bridge for Analytics is that it is effectively a hybrid between a transactional and an analytical database system. That is a powerful tool when dealing with data-intensive applications which may for example require a combination of low latency, high throughput insertion, efficient lookup of recent data, and fast interactive analytics over historical data.
A common source of large data volumes is append-mostly time series data or event data generated by an application. PostgreSQL has various tools to optimize your database for time series, such as partitioning, BRIN indexes, time functions, and its native heap storage format is well-suited for bulk writes. However, there is a limit to what PostgreSQL can do with large data volumes, especially in terms of performance of analytical queries on large data sets, and the operational overhead of storing a large amount of historical data in your database.
Bridge for Analytics solves this problem in 2 ways. In addition to everything PostgreSQL can already do:
- You can easily copy (or transform & copy) data into your data lake for cheaper storage and access by other applications
- You can create tables to efficiently run analytical queries on your data lake through a vectorized query engine and caching on Nvme drives
This blog post describes an end-to-end solution for storing recent event data in PostgreSQL using time-partitioning, and then copying those time partitions into your data lake, and running fast analytical queries, all on the same Bridge for Analytics instance.
Setting up a time-partitioned table for fast writes and lookup queries
When dealing with a large stream of events, it is almost always a good idea to create a partitioned tables using pg_partman and use pg_cron for periodic maintenance. Partitioning by time helps to efficiently drop old data, and typically improves the performance of both queries and inserts, by keeping indexes small, and avoiding bloat (gaps left by deleted rows) and fragmentation (rows that are frequently retrieved together getting scattered through space reuse).
On Crunchy Bridge, you can connect to your Bridge for Analytics cluster as the postgres superuser and set up pg_partman and pg_cron:
-- Run the following as superuser
CREATE SCHEMA IF NOT EXISTS partman;
CREATE EXTENSION IF NOT EXISTS pg_partman WITH SCHEMA partman;
GRANT ALL ON SCHEMA partman TO application;
GRANT ALL ON ALL TABLES IN SCHEMA partman TO application;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA partman TO application;
GRANT EXECUTE ON ALL PROCEDURES IN SCHEMA partman TO application;
CREATE EXTENSION IF NOT EXISTS pg_cron;
GRANT USAGE ON SCHEMA cron TO application;
Now we'll create a table to capture requests to a website or API. We might want to load such data in batches from recent logs, or do an insert for individual requests from our application. We keep up to 7 days of data to power a dashboard with recent user activity, and to help in debugging issues.
-- Create a time-partitioned table
DROP TABLE IF EXISTS requests;
CREATE TABLE requests (
event_time timestamptz not null default now(),
event_id bigint generated always as identity,
request_type text not null,
url text,
response_code int,
response_time double precision,
tenant_id bigint,
user_tag text,
session_tag text
)
PARTITION BY RANGE (event_time);
CREATE INDEX ON requests USING brin (event_time);
CREATE INDEX ON requests (tenant_id, user_tag);
-- Set up automatic partitioning by day
SELECT partman.create_parent(
p_parent_table := 'public.requests',
p_control := 'event_time',
p_interval := '1 day',
p_start_partition := '2024-01-01',
p_default_table := false
);
-- Drop tables older than a week when running pg_partman maintenance
UPDATE partman.part_config SET retention_keep_table = false, retention = '1 week'
WHERE parent_table = 'public.requests';
-- Regularly run pg_partman maintenance
SELECT cron.schedule('partman-maintenance', '@daily',
$$CALL partman.run_maintenance_proc()$$);
This set up will automatically create new partitions for incoming data as time progresses and drop old partitions after a week. In the meantime, we can do fast lookups of rows on indexed columns and time-range filters.
Copying data into a lake analytics table for cheap storage and fast analytics
You may want to keep your data longer than 7 days on cheaper storage, and also perform fast, interactive analytics. For this we'll create a crunchy_lake_analytics table, which is backed by compressed Parquet files in S3. Parquet is a file format that is optimized for analytics.
-- Create an analytics table
CREATE FOREIGN TABLE requests_analytics (
event_time timestamptz not null,
event_id bigint not null,
request_type text not null,
url text,
response_code int,
response_time double precision,
tenant_id bigint,
user_tag text,
session_tag text
)
SERVER crunchy_lake_analytics
OPTIONS (path 's3://mybucket/requests/*.parquet');
When we talk about partitioning you might be thinking of a model where old partitions rotate into S3 (”tiering”), so the overall partitioned table becomes a mixture of recent data in heap partitions and older data in Parquet partitions. However, we do not recommend such a model for analytics tables, because it has several downsides:
- Each storage format targets a different type of query, and running both types of queries on the same mixed table can give the worst of both worlds.
- Fast analytics on columnar data is only available for very old data, while you might want it for relatively recent data too.
- You may want to apply some transformation before writing to cold storage, meaning older data follows a different schema.
- Updates / late inserts on older partitions become hard to handle if the data is converted to Parquet.
- Aggregates cannot always be performed efficiently on partitioned tables.
Hence, we prefer to follow an alternative model where a crunchy_lake_analytics table exists alongside the source table, and we copy data over as soon as we stop receiving new data for a partition, but may still keep it in the time-partitioned heap table as needed. The data is effectively still time-partitioned, and it has all the same benefits, but the partitioning is hidden from PostgreSQL.
This approach creates some redundancy where recent data is stored a second time in compressed form in S3. That's a small price to pay for getting a lot more flexibility and analytics performance. For instance, if at some point we get inserts from a few days ago or need to perform an update, then we can simply modify the heap partition and re-copy the data.
Copying data into your analytics table
We need a way to copy data from our time-partitioned heap table into our analytics table, for which we can use the COPY command. Let's define a function that copies a partition into a Parquet file in S3, using the default snappy compression.
-- Create a function to transform a partition and move it into the analytics table
CREATE FUNCTION public.copy_partition_to_data_lake(partition_name_p regclass,
url_p text)
RETURNS void LANGUAGE plpgsql SET search_path TO '' AS $function$
BEGIN
EXECUTE format($$
COPY (SELECT * FROM %s) TO %L WITH (format 'parquet')
$$, partition_name_p, url_p);
END;
$function$;
It may be helpful to do a small bit of bookkeeping for the data we copied. That way we can easily make sure to not skip any partitions in case of failure, or skip copying the data if copying has already succeeded.
-- Create a table for tracking which partitions have been synced to S3
CREATE TABLE synced_partitions (
partition_name regclass primary key
);
-- Create a function that ensures a partition is synced to S3
CREATE FUNCTION public.sync_partition(partition_name_p regclass)
RETURNS void LANGUAGE plpgsql SET search_path TO '' AS $function$
DECLARE
/* construct the URL for a given partition */
url text := format('s3://mybucket/requests/%1$s.parquet', partition_name_p);
BEGIN
/* skip if we already synced to S3 */
IF NOT EXISTS (SELECT 1 FROM public.synced_partitions WHERE partition_name = partition_name_p) THEN
/* remember that we synced */
INSERT INTO public.synced_partitions VALUES (partition_name_p);
/* copy data to S3 */
PERFORM public.copy_partition_to_data_lake(partition_name_p, url);
/* invalidate the nvme cache */
PERFORM crunchy_file_cache.remove(url);
END IF;
END;
$function$;
-- Ensure all partitions from before today are periodically copied
-- into the analytics table
SELECT cron.schedule('sync-partitions', '@hourly', $$
SELECT
public.sync_partition(format('%I.%I', partition_schemaname, partition_tablename))
FROM
partman.show_partitions('public.requests'),
partman.show_partition_info(format('%I.%I', partition_schemaname, partition_tablename))
WHERE
child_end_time::timestamptz <= current_date;
$$);
This gives a fully automated system for reliably copying incoming data into our data lake.
Running fast analytical queries on our historical data
To try our set up, let's insert some synthetic data (300M rows):
INSERT INTO
requests (event_time, request_type, response_time, response_code, url, tenant_id, user_tag, session_tag)
SELECT
current_date - interval '5 days' * random(), 'get', random(), 0, 'https://app.com/'||md5(random()::text), s % 100, md5((s % 1000000)::text), md5(random()::text)
FROM
generate_series(1,30000000) s;
We could wait for the hourly sync, but could also sync immediately. We use the show_partition functions in pg_partman to find partitions that have data from before today.
SELECT
sync_partition(format('%I.%I', partition_schemaname, partition_tablename))
FROM
partman.show_partitions('public.requests'),
partman.show_partition_info(format('%I.%I', partition_schemaname, partition_tablename))
WHERE
child_end_time::timestamptz <= current_date;
Now we are ready to run analytical queries and can compare running them on the partitioned heap table vs. the analytics table:
-- Simple analytical query on time-partitioned heap table (slow)
SELECT url, count(*) FROM requests GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
...
Time: 22525.851 ms
-- Simple analytical query on analytics table reading from data lake (fast)
SELECT url, count(*) FROM requests_analytics GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
...
Time: 2301.106 ms
-- A bit later: data files are cached on nvme drive (really fast)
SELECT url, count(*) FROM requests_analytics GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
...
Time: 494.507 ms
-- Lookup query on time-partitioned heap table (fast)
SELECT * FROM requests WHERE tenant_id = 4 AND user_tag = 'a87ff679a2f3e71d9181a67b7542122c' ORDER BY event_time DESC LIMIT 10;
...
Time: 20.084 ms
-- Lookup query on analytics table (slow-ish)
SELECT * FROM requests_analytics WHERE tenant_id = 4 AND user_tag = 'a87ff679a2f3e71d9181a67b7542122c' ORDER BY event_time DESC LIMIT 10;
...
Time: 121.213 ms
As you can see, both time-partitioned heap tables and analytics tables have merits. The heap tables can have fast insertion and fast lookups using indexes, while analytical tables are 1-2 orders of magnitude faster for analytical queries, which prepares you for dealing with very large amounts of historical data.
In this case, we made sure that the two tables had identical contents to compare performance. In practice, you might also want to normalize, filter, or scrub your data when copying into the historical table, which can give you some additional speed ups.
Get started with Crunchy Bridge for Analytics
Bridge for Analytics helps you query your existing data in your data lake from PostgreSQL, but also supports hybrid workloads where you combine heap tables and analytics tables to handle more advanced, data-intensive workloads.
You can get started with Bridge for Analytics with a few clicks once you’re signed up to Crunchy Bridge. You’ll get a fully managed PostgreSQL cluster with the additional extensions and other components that power analytics. Then, head over to the Analytics docs to find out more.
Related Articles
- Postgres Tuning & Performance for Analytics Data
19 min read
- Running an Async Web Query Queue with Procedures and pg_cron
6 min read
- Name Collision of the Year: Vector
9 min read
- Sidecar Service Meshes with Crunchy Postgres for Kubernetes
12 min read
- pg_incremental: Incremental Data Processing in Postgres
11 min read