Postgres, dbt, and Iceberg: Scalable Data Transformation
Aykut Bozkurt
7 min readMore by this author
Seamless integration of dbt with Crunchy Data Warehouse automates data movement between Postgres and Apache Iceberg. dbt’s modular SQL approach, combined with Iceberg’s scalable storage, and Postgres’ query engine means you can build fast, efficient, and reliable analytics—with minimal complexity.
Today let’s dig into an example of using dbt with Postgres and Iceberg. The steps will be:
- Set up Iceberg tables in Crunchy Data Warehouse using real-world real-time data from GitHub events
- Configure dbt to transform and summarize the data with rollups/aggregations
- Utilize incremental models to process new data
- Query and analyze the results for insights with Postgres
Creating Iceberg tables with dbt
Typically when working a data warehouse you’ll initially create and stage your source table, then have other systems operate on top of it. Here, instead of manually creating the source table, we can use a dbt macro to automate the process. Creating Iceberg tables with dbt allows you to keep your data pipelines under version control and test them locally. Below is a sample dbt macro that defines the source table to efficiently store and process the events:
{% macro create_crunchy_events() %}
{% set sql %}
set crunchy_iceberg.default_location_prefix TO '{{ env_var('ICEBERG_LOCATION_PREFIX', '') }}';
create schema if not exists crunchy_gh;
create table crunchy_gh.events (
id text,
type text,
actor text,
repo text,
payload text,
public boolean,
created_at timestamptz,
org text)
using iceberg;
{% endset %}
{% do run_query(sql) %}
{% do log("create_crunchy_events finished", info=True) %}
{% endmacro %}
Before creating the source table, let’s set location prefix where our iceberg table’s files is going to be located at:
export ICEBERG_LOCATION_PREFIX='s3://v5zf6dobuac3rmwxnzykbdncdqzckxzh/6xl6nijprvcp3i2dolnfcv6l4e'
You can run the macro to create the source table as shown below:
dbt run-operation create_crunchy_events
postgres=# \d crunchy_gh.events
Foreign table "crunchy_gh.events"
Column | Type | Collation | Nullable | Default | FDW options
------------+--------------------------+-----------+----------+---------+-------------
id | text | | | |
type | text | | | |
actor | text | | | |
repo | text | | | |
payload | text | | | |
public | boolean | | | |
created_at | timestamp with time zone | | | |
org | text | | | |
Server: crunchy_iceberg
FDW options: (location 's3://ipmikgqfjhtnenhmfu2nek7v43pmwxdk/feooahhfg5eolm7js2dsbhg7kq/postgres/crunchy_gh/events/16802')
Ingesting sample data from GitHub events in S3
In this example, we’ll use GitHub events of the repos of Crunchy Data that contain several events such as when an issue is opened or a pull request is commented on. GitHub event data has been exposed by ClickHouse in a public URL: s3://clickhouse-public-datasets/gharchive/original/.
To load new data into the source table, a dbt macro will fetch GitHub events for a given date and inserts them into the table:
{% macro copy_crunchy_events(event_date) %}
{% set sql %}
CREATE OR REPLACE PROCEDURE copy_crunchy_events(event_date date)
LANGUAGE plpgsql
AS $$
BEGIN
FOR h IN 0..23 LOOP
RAISE NOTICE 'Executing hour: %', h;
BEGIN
EXECUTE 'COPY crunchy_gh.events
FROM ''s3://clickhouse-public-datasets/gharchive/original/' || event_date || '-' || h || '.json.gz''
WITH (format ''json'')
WHERE repo LIKE ''%%CrunchyData/%%'';';
EXCEPTION
WHEN OTHERS THEN
-- sometimes files are not compressed as expected
EXECUTE 'COPY crunchy_gh.events
FROM ''s3://clickhouse-public-datasets/gharchive/original/' || event_date || '-' || h || '.json.gz''
WITH (format ''json'', compression ''none'')
WHERE repo LIKE ''%%CrunchyData/%%'';';
END;
END LOOP;
END
$$;
CALL copy_crunchy_events('{{ event_date }}');
{% endset %}
{% do run_query(sql) %}
{% do log("copy_crunchy_events finished", info=True) %}
{% endmacro %}
You can simply ingest events of a specific day as shown below:
dbt run-operation copy_crunchy_events --args "{event_date: 2024-10-17}"
postgres=# select count(*) from crunchy_gh.events;
count
-------
97
(1 row)
postgres=# select * from crunchy_gh.events;
-[ RECORD 1 ]-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | 42934521395
type | ForkEvent
actor | {"id":144409824,"login":"sagar-shrestha24","display_login":"sagar-shrestha24","gravatar_id":"","url":"https://api.github.com/users/sagar-shrestha24","avatar_url":"https://avatars.githubusercontent.com/u/144409824?"}
repo | {"id":362921285,"name":"CrunchyData/postgres-operator-examples","url":"https://api.github.com/repos/CrunchyData/postgres-operator-examples"}
payload | {"forkee":{"id":874076753,"node_id":"R_kgDONBlaUQ","name":"postgres-operator-examples","full_name":"sagar-shrestha24/postgres-operator-examples","private":false,"public":true}}
public | t
created_at | 2024-10-17 08:04:13+00
org | {"id":8248870,"login":"CrunchyData","gravatar_id":"","url":"https://api.github.com/orgs/CrunchyData","avatar_url":"https://avatars.githubusercontent.com/u/8248870?"}
-[ RECORD 2 ]-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | 42931737143
type | ForkEvent
actor | {"id":8130747,"login":"ISingularity","display_login":"ISingularity","gravatar_id":"","url":"https://api.github.com/users/ISingularity","avatar_url":"https://avatars.githubusercontent.com/u/8130747?"}
repo | {"id":83363132,"name":"CrunchyData/postgres-operator","url":"https://api.github.com/repos/CrunchyData/postgres-operator"}
payload | {"forkee":{"id":874036356,"node_id":"R_kgDONBi8hA","name":"postgres-operator","full_name":"ISingularity/postgres-operator","private":false,"public":true}}
public | t
created_at | 2024-10-17 06:33:32+00
org | {"id":8248870,"login":"CrunchyData","gravatar_id":"","url":"https://api.github.com/orgs/CrunchyData","avatar_url":"https://avatars.githubusercontent.com/u/8248870?"}
In the next section, we’ll configure dbt to use this table as a source and start building transformations to get daily stars of the repos under Crunchy Data organization!
Transform Iceberg table via dbt
The transformation process in dbt involves defining a model, which specifies how we want the data to be transformed from the raw source table into a more refined dataset. We’ll explain the model configuration and SQL logic used in this process, along with the key feature of incremental processing.
Model Configuration
In dbt, the model configuration controls how the transformation process behaves. For this transformation, we use the incremental
materialization, which is key to processing new data without reprocessing the entire dataset. The configuration includes a couple of important options:
materialized='incremental'
: This tells dbt to perform incremental updates instead of fully rebuilding the table each time.unique_key='created_at'
: This specifies the unique identifier for each record, used to detect new records.pre_hook
andpost_hook
: These hooks are executed before and after the model runs. In this case, thepre_hook
sets the default access method toiceberg
and configures the location prefix for storing Iceberg tables in S3. Thepost_hook
resets these settings after the model has completed.
{
{
config(
(materialized = 'incremental'),
(unique_key = 'created_at'),
(pre_hook =
"SET default_table_access_method TO 'iceberg'; SET crunchy_iceberg.default_location_prefix = '{{ env_var('ICEBERG_LOCATION_PREFIX', '') }}';"),
(post_hook =
'RESET default_table_access_method; RESET crunchy_iceberg.default_location_prefix;'),
)
}
}
dbt's incremental processing ensures that we only process the data that has changed, reducing computational cost.
dbt SQL for data summary and rollup
The transformation SQL aggregates the events from the source table and groups them by day and repo. It then counts the number of stars on each day.
select date_trunc('day', created_at)::date as created_at,
(repo::jsonb)->>'name' AS repo,
count(*) as stars
from {{ source('crunchy_gh', 'events') }}
where type = 'WatchEvent'
group by date_trunc('day', created_at)::date, (repo::jsonb)->>'name'
{% if is_incremental() %}
having (date_trunc('day', created_at)::date >= (select coalesce(max(created_at),'1900-01-01') from {{ this }} ))
{% endif %}
The key part of this SQL is the conditional HAVING
clause, which ensures that only new records are processed during the incremental runs. Here's how it works:
- The
is_incremental()
function checks if dbt is running in incremental mode. - If the run is incremental, the
HAVING
clause filters the records to only include those with acreated_at
value that is greater than or equal to the latest date in the already processed data (select coalesce(max(created_at), '1900-01-01') from {{ this }}
).
This ensures that dbt only processes the new data that has been ingested since the last run, making the transformation process more efficient.
Let’s run the model to create its table and feed it with initial data:
dbt build --models daily_stars
postgres=# select * from crunchy_demos_crunchy_gh.daily_stars order by created_at;
created_at | repo | stars
------------+-------------------------------+-------
2024-10-17 | CrunchyData/pg_tileserv | 1
2024-10-17 | CrunchyData/postgres-operator | 1
2024-10-17 | CrunchyData/pg_parquet | 74
(3 rows)
Assume, it is the new day and we want to ingest the new data:
dbt run-operation copy_crunchy_events --args "{event_date: 2024-10-18}"
postgres=# select * from crunchy_gh.events LIMIT 2;
count
-------
242
(1 row)
Then, rerun the model to incrementally process the new events from the source table to update the daily stars:
dbt build --models daily_stars
postgres=# select * from crunchy_demos_crunchy_gh.daily_stars order by created_at;
created_at | repo | stars
------------+---------------------------------+-------
2024-10-17 | CrunchyData/pg_tileserv | 1
2024-10-17 | CrunchyData/postgres-operator | 1
2024-10-17 | CrunchyData/pg_parquet | 74
2024-10-18 | CrunchyData/pgCompare | 2
2024-10-18 | CrunchyData/pg_parquet | 118
2024-10-18 | CrunchyData/postgres-operator | 2
2024-10-18 | CrunchyData/pgmonitor-extension | 1
(7 rows)
Summary
We're excited about the new automation capabilities for scalable analytics solutions across Postgres and Iceberg using dbt and Crunchy Data Warehouse. This integration can make real-time analytics in Postgres more accessible to any organization.
We just looked at how dbt can create scalable, version controlled automation with Iceberg and Postgres in different way, like:
- A dbt macro that automates the creation of Iceberg tables
- A custom dbt macro that loads event data from an S3 dataset using Postgres' COPY command
- Incremental processing in dbt for processing only new records
- A dbt SQL transformation model that aggregates event data by day for easy analytics
As you start working with dbt, Iceberg, and Postgres, we'd love to hear from you.
Related Articles
- Postgres, dbt, and Iceberg: Scalable Data Transformation
7 min read
- Validating Data Types from Semi-Structured Data Loads in Postgres with pg_input_is_valid
4 min read
- Incremental Archival from Postgres to Parquet for Analytics
7 min read
- Postgres Parallel Query Troubleshooting
5 min read
- Using Cloud Rasters with PostGIS
10 min read