Postgres + Citus + Partman, Your IoT Database
Postgres is a robust data platform. Yes, it's more than a boring old relational database. It has rich indexing, data types (including JSON), and so much more. It also has support for a variety of extensions that can further broaden it's already great functionality. Two of those extensions when coupled together make Postgres a very compelling approach for IoT architectures. Today we're going to start from the ground up on how you would design your architecture with Postgres along with the Citus and pg_partman extensions.
Citus and sharding
Citus is an extension that allows you to shard your database across multiple nodes, while allowing your application to remain largely unaware. Citus can be beneficial to your app if:
- You expect to outgrow the performance a single Postgres instance can deliver
- Your schema/data model can be mapped cleanly to Citus
- The queries/workload pattern can be mapped cleanly to Citus
Lucky for us, IOT workloads check the boxes for all of the above.
Starting with our IoT dataset
We're going to begin with a simple schema that relates to vehicles and tracks a few basic measurements against them. We'll also have a table that tracks the location of the vehicle at the time of each sensor sampling as well.
CREATE TABLE sensor_data (
id SERIAL,
car_id VARCHAR(17) NOT NULL,
sensor_type VARCHAR(20) NOT NULL,
sensor_value INT NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE TABLE location_data (
id SERIAL,
car_id VARCHAR(17) NOT NULL,
latitude float8,
longitude float8,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL
);
While our above schema is simply it's not unrealistic of many IoT data models, though yours could be more complex.
How do we shard this dataset?
The key to sharding is that you can push down most of your joins to the node where the data is located. If you're having to move data in between nodes in order to join your performance will suffer. In the case of IoT workloads device_id
is a very common pattern for the sharding key.
To turn this into a sharded database with Citus installed we simply need to run:
SELECT create_distributed_table('sensor_data', 'car_id');
SELECT create_distributed_table('location_data', 'car_id');
By default Citus will co-locate device IDs together because they're sharded on the same value and have the same number of shards. Citus uses a default of creating 32 shards, but it's configurable if you need more or less. It's worth noting that shards are separate from the number of nodes/instances. In this case if you had a Citus configuration of 1 coordinator and 2 workers each worker would reach 16 shards of sensor_data
and 16 of location_data
.
Now once sharded you we can see how Citus will work. Lets run two different queries and see how Citus works with each:
SELECT sensor_data.car_id, max(sensor_value)
FROM sensor_data
WHERE sensor_type = 'temperature'
GROUP BY 1
ORDER BY 2 DESC;
In the above case Citus will actually parallelize the query and in total run 32 queries, one against each shard, bring back the results to the coordinator and compute the final result. This means for each individual query you run you'd have 32 total connections going out from your coordinator and executing queries. This is great for parallelism, but a big trade-off is the concurrency you get in the number of queries you can run.
However, if we modify the query to include the car_id
Citus will execute a single query on the worker node where the data lives.
SELECT sensor_data.car_id, max(sensor_value)
FROM sensor_data
WHERE sensor_type = 'temperature'
AND car_id='433P2C2S7TJ654181';
Even if we were to expand the query to return the location data as well, because the data is co-located Citus knows it can push down the join to that single node.
SELECT sensor_data.car_id,
max(sensor_value),
location_data.latitude,
location_data.longitude
FROM sensor_data,
location_data
WHERE sensor_type = 'temperature'
AND sensor_data.car_id='433P2C2S7TJ654181';
AND location_data.car_id='433P2C2S7TJ654181';
AND sensor_data.car_id = location_data.car_id
AND sensor_data.timestamp = location_data.timestamp
Again, if you anticipate a large data volume and issues scaling performance, your data model can be structured to be cleanly sharded, and your query workload fits well into Citus. Citus gives you a lot of peace of mind to scale out. But where does time series come in?
Time series and Citus
Postgres itself already has native partitioning built-in, but we often recommend coupling that with pg_partman
which extends the native partitioning with some helper utilities to make it easier to work with. Partitioning is the process of separating data out by particular buckets into separate tables. In an IOT scenario you may want to retain data on all of your vehicles for the past year, but in most cases are only querying the data for the last week. In that case you could easily partition your data by week, this would allow more easily for the smaller data set of the last week or two to be kept in memory because it is smaller and corresponding indexes are also smaller and easier to maintain.
In order to set up pg_partman
with Citus we're actually going to start fresh and create our tables as partitioned tables. Here we can see the end to end setup similar to earlier with Citus, but this time with partitioned tables:
CREATE TABLE sensor_data (
id SERIAL,
car_id VARCHAR(17) NOT NULL,
sensor_type VARCHAR(20) NOT NULL,
sensor_value INT NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL
) PARTITION BY RANGE (timestamp);
CREATE TABLE location_data (
id SERIAL,
car_id VARCHAR(17) NOT NULL,
latitude float8,
longitude float8,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL
) PARTITION BY RANGE (timestamp);
SELECT create_distributed_table('sensor_data', 'car_id');
SELECT create_distributed_table('location_data', 'car_id');
Now if we look at our database, it still contains just a few tables sensor_data
and location_data
:
\d
List of relations
Schema | Name | Type | Owner
--------+---------------------------+-------------------+-------------------
public | location_data | partitioned table | application
public | location_data_id_seq | sequence | application
public | pg_stat_statements | view | crunchy_superuser
public | pg_stat_statements_info | view | crunchy_superuser
public | sensor_data | partitioned table | application
public | sensor_data_id_seq | sequence | application
(6 rows)
We've declared sensor_data
and location_data
as partitioned tables, we haven't done anything to setup the initial partitions. Here we're going to leverage pg_partman
to create the partitions. We're going to have it create monthly partitions, you could have this been weekly, daily or some other granularity. We're going to have it create partitions starting at 1 month ago:
SELECT partman.create_parent('public.sensor_data', 'thetime', 'native', 'monthly',
p_start_partition := (now() - interval '1 month')::date::text );
SELECT partman.create_parent('public.location_data', 'thetime', 'native', 'monthly',
p_start_partition := (now() - interval '1 month')::date::text );
-- Configure partman to continue creating partitions
UPDATE partman.part_config SET infinite_time_partitions = true;
-- Configure partman to regularly run to create new partitions
SELECT cron.schedule('@hourly', $$SELECT partman.run_maintenance()$$);
So now we're running partitioned data inside each of our Citus shards.
Long term data retention & columnar compression
The above approach for partitioning and sharding works great when it comes to building your application and keeping it performant. Enter the cost management side of the equation. Retaining all data for all of time is valuable if the cost is free, but saving all of your data so it’s easily queryable isn’t actually going to be free. Enter Citus columnar support. Citus columnar support comes with a few caveats:
- No support for updates or deletes
- No support for logical replication or decoding
Fortunately for us, our IoT use case still can fully take advantage of the columnar format and provide:
- Great storage compression
- Faster querying when scanning lots of sequential data
Let's look at turning a table into a columnar one:
SELECT alter_table_set_access_method('sensor_data_2023_oct', 'columnar');
This will change the partition for sensor_data in October into a columnar format.
We can now run a VACUUM VERBOSE sensor_data
on the table and see that we have a 10.20x compression rate!
VACUUM VERBOSE sensor_data;
INFO: statistics for "sensor_data":
storage id: 10000000068
total file size: 64897024, total data size: 64252933
compression rate: 10.20x
total row count: 11999999, stripe count: 80, average rows per stripe: 149999
chunk count: 6000, containing data for dropped columns: 0, zstd compressed: 6000
Because our IoT data generally comes in within a set period of time, and is immutable after a specific date, we can then go and compress partitions after a certain period of time. In this case we’re going to convert all partitions older than 3 months. Bear with us because the pg_cron
incantation for it is a bit gnarly, but gets the job done:
DO $accessmethod$
DECLARE
v_row_partitions record;
v_row_info record;
v_sql text;
BEGIN
FOR v_row_partitions IN
SELECT partition_schemaname||'.'||partition_tablename AS partition_name FROM partman.show_partitions('partman_test.time_taptest_table')
LOOP
FOR v_row_info IN
SELECT child_start_time, child_end_time FROM partman.show_partition_info(v_row_partitions.partition_name)
LOOP
IF v_row_info.child_end_time < CURRENT_TIMESTAMP - '3 months'::interval THEN
v_sql := format('SELECT alter_table_set_access_method(%L, columnar)', v_row_partitions.partition_name);
EXECUTE '%', v_sql;
END IF;
END LOOP;
END LOOP;
END
$accessmethod$;
Your scalable IOT database with Citus, pg_partman, and columnar compression
And there we have it, a horizontally scalable database for an IOT workload driven by:
- Citus based sharding for seamless scaling and performance
- pg_partman for native time-series partitioning, giving us faster query recall and reporting
- Columnar compression to help us better manage storage and longer term retention
Related Articles
- Postgres Tuning & Performance for Analytics Data
19 min read
- Running an Async Web Query Queue with Procedures and pg_cron
6 min read
- Name Collision of the Year: Vector
9 min read
- Sidecar Service Meshes with Crunchy Postgres for Kubernetes
12 min read
- pg_incremental: Incremental Data Processing in Postgres
11 min read