Introducing Crunchy Data Warehouse: A next-generation Postgres-native data warehouse. Crunchy Data Warehouse Learn more

Postgres, dbt, and Iceberg: Scalable Data Transformation

Aykut Bozkurt

7 min readMore by this author

Seamless integration of dbt with Crunchy Data Warehouse automates data movement between Postgres and Apache Iceberg. dbt’s modular SQL approach, combined with Iceberg’s scalable storage, and Postgres’ query engine means you can build fast, efficient, and reliable analytics—with minimal complexity.

Today let’s dig into an example of using dbt with Postgres and Iceberg. The steps will be:

  1. Set up Iceberg tables in Crunchy Data Warehouse using real-world real-time data from GitHub events
  2. Configure dbt to transform and summarize the data with rollups/aggregations
  3. Utilize incremental models to process new data
  4. Query and analyze the results for insights with Postgres

Creating Iceberg tables with dbt

Typically when working a data warehouse you’ll initially create and stage your source table, then have other systems operate on top of it. Here, instead of manually creating the source table, we can use a dbt macro to automate the process. Creating Iceberg tables with dbt allows you to keep your data pipelines under version control and test them locally. Below is a sample dbt macro that defines the source table to efficiently store and process the events:

{% macro create_crunchy_events() %}

{% set sql %}
    set crunchy_iceberg.default_location_prefix TO '{{ env_var('ICEBERG_LOCATION_PREFIX', '') }}';

    create schema if not exists crunchy_gh;

    create table crunchy_gh.events (
        id text,
        type text,
        actor text,
        repo text,
        payload text,
        public boolean,
        created_at timestamptz,
        org text)
    using iceberg;
{% endset %}

{% do run_query(sql) %}
{% do log("create_crunchy_events finished", info=True) %}
{% endmacro %}

Before creating the source table, let’s set location prefix where our iceberg table’s files is going to be located at:

export ICEBERG_LOCATION_PREFIX='s3://v5zf6dobuac3rmwxnzykbdncdqzckxzh/6xl6nijprvcp3i2dolnfcv6l4e'

You can run the macro to create the source table as shown below:

dbt run-operation create_crunchy_events
postgres=# \d crunchy_gh.events
                          Foreign table "crunchy_gh.events"
   Column   |           Type           | Collation | Nullable | Default | FDW options
------------+--------------------------+-----------+----------+---------+-------------
 id         | text                     |           |          |         |
 type       | text                     |           |          |         |
 actor      | text                     |           |          |         |
 repo       | text                     |           |          |         |
 payload    | text                     |           |          |         |
 public     | boolean                  |           |          |         |
 created_at | timestamp with time zone |           |          |         |
 org        | text                     |           |          |         |
Server: crunchy_iceberg
FDW options: (location 's3://ipmikgqfjhtnenhmfu2nek7v43pmwxdk/feooahhfg5eolm7js2dsbhg7kq/postgres/crunchy_gh/events/16802')

Ingesting sample data from GitHub events in S3

In this example, we’ll use GitHub events of the repos of Crunchy Data that contain several events such as when an issue is opened or a pull request is commented on. GitHub event data has been exposed by ClickHouse in a public URL: s3://clickhouse-public-datasets/gharchive/original/.

To load new data into the source table, a dbt macro will fetch GitHub events for a given date and inserts them into the table:

{% macro copy_crunchy_events(event_date) %}
{% set sql %}
    CREATE OR REPLACE PROCEDURE copy_crunchy_events(event_date date)
    LANGUAGE plpgsql
    AS $$
    BEGIN

        FOR h IN 0..23 LOOP
            RAISE NOTICE 'Executing hour: %', h;

            BEGIN
                EXECUTE 'COPY crunchy_gh.events
                         FROM ''s3://clickhouse-public-datasets/gharchive/original/' || event_date || '-' || h || '.json.gz''
                         WITH (format ''json'')
                         WHERE repo LIKE ''%%CrunchyData/%%'';';
            EXCEPTION
                WHEN OTHERS THEN
                    -- sometimes files are not compressed as expected
                    EXECUTE 'COPY crunchy_gh.events
                             FROM ''s3://clickhouse-public-datasets/gharchive/original/' || event_date || '-' || h || '.json.gz''
                             WITH (format ''json'', compression ''none'')
                             WHERE repo LIKE ''%%CrunchyData/%%'';';
            END;
        END LOOP;

    END
    $$;

    CALL copy_crunchy_events('{{ event_date }}');
{% endset %}

{% do run_query(sql) %}
{% do log("copy_crunchy_events finished", info=True) %}
{% endmacro %}

You can simply ingest events of a specific day as shown below:

dbt run-operation copy_crunchy_events --args "{event_date: 2024-10-17}"
postgres=# select count(*) from crunchy_gh.events;
 count
-------
    97
(1 row)

postgres=# select * from crunchy_gh.events;
-[ RECORD 1 ]-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id         | 42934521395
type       | ForkEvent
actor      | {"id":144409824,"login":"sagar-shrestha24","display_login":"sagar-shrestha24","gravatar_id":"","url":"https://api.github.com/users/sagar-shrestha24","avatar_url":"https://avatars.githubusercontent.com/u/144409824?"}
repo       | {"id":362921285,"name":"CrunchyData/postgres-operator-examples","url":"https://api.github.com/repos/CrunchyData/postgres-operator-examples"}
payload    | {"forkee":{"id":874076753,"node_id":"R_kgDONBlaUQ","name":"postgres-operator-examples","full_name":"sagar-shrestha24/postgres-operator-examples","private":false,"public":true}}
public     | t
created_at | 2024-10-17 08:04:13+00
org        | {"id":8248870,"login":"CrunchyData","gravatar_id":"","url":"https://api.github.com/orgs/CrunchyData","avatar_url":"https://avatars.githubusercontent.com/u/8248870?"}
-[ RECORD 2 ]-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id         | 42931737143
type       | ForkEvent
actor      | {"id":8130747,"login":"ISingularity","display_login":"ISingularity","gravatar_id":"","url":"https://api.github.com/users/ISingularity","avatar_url":"https://avatars.githubusercontent.com/u/8130747?"}
repo       | {"id":83363132,"name":"CrunchyData/postgres-operator","url":"https://api.github.com/repos/CrunchyData/postgres-operator"}
payload    | {"forkee":{"id":874036356,"node_id":"R_kgDONBi8hA","name":"postgres-operator","full_name":"ISingularity/postgres-operator","private":false,"public":true}}
public     | t
created_at | 2024-10-17 06:33:32+00
org        | {"id":8248870,"login":"CrunchyData","gravatar_id":"","url":"https://api.github.com/orgs/CrunchyData","avatar_url":"https://avatars.githubusercontent.com/u/8248870?"}

In the next section, we’ll configure dbt to use this table as a source and start building transformations to get daily stars of the repos under Crunchy Data organization!

Transform Iceberg table via dbt

The transformation process in dbt involves defining a model, which specifies how we want the data to be transformed from the raw source table into a more refined dataset. We’ll explain the model configuration and SQL logic used in this process, along with the key feature of incremental processing.

Model Configuration

In dbt, the model configuration controls how the transformation process behaves. For this transformation, we use the incremental materialization, which is key to processing new data without reprocessing the entire dataset. The configuration includes a couple of important options:

  • materialized='incremental': This tells dbt to perform incremental updates instead of fully rebuilding the table each time.
  • unique_key='created_at': This specifies the unique identifier for each record, used to detect new records.
  • pre_hook and post_hook: These hooks are executed before and after the model runs. In this case, the pre_hook sets the default access method to iceberg and configures the location prefix for storing Iceberg tables in S3. The post_hook resets these settings after the model has completed.
{
	{
		config(
			(materialized = 'incremental'),
			(unique_key = 'created_at'),
			(pre_hook =
				"SET default_table_access_method TO 'iceberg'; SET crunchy_iceberg.default_location_prefix = '{{ env_var('ICEBERG_LOCATION_PREFIX', '') }}';"),
			(post_hook =
				'RESET default_table_access_method; RESET crunchy_iceberg.default_location_prefix;'),
		)
	}
}

dbt's incremental processing ensures that we only process the data that has changed, reducing computational cost.

dbt SQL for data summary and rollup

The transformation SQL aggregates the events from the source table and groups them by day and repo. It then counts the number of stars on each day.

select date_trunc('day', created_at)::date as created_at,
       (repo::jsonb)->>'name' AS repo,
       count(*) as stars
from {{ source('crunchy_gh', 'events') }}
where type = 'WatchEvent'
group by date_trunc('day', created_at)::date, (repo::jsonb)->>'name'

{% if is_incremental() %}
having (date_trunc('day', created_at)::date >= (select coalesce(max(created_at),'1900-01-01') from {{ this }} ))
{% endif %}

The key part of this SQL is the conditional HAVING clause, which ensures that only new records are processed during the incremental runs. Here's how it works:

  • The is_incremental() function checks if dbt is running in incremental mode.
  • If the run is incremental, the HAVING clause filters the records to only include those with a created_at value that is greater than or equal to the latest date in the already processed data (select coalesce(max(created_at), '1900-01-01') from {{ this }}).

This ensures that dbt only processes the new data that has been ingested since the last run, making the transformation process more efficient.

Let’s run the model to create its table and feed it with initial data:

dbt build --models daily_stars
postgres=# select * from crunchy_demos_crunchy_gh.daily_stars order by created_at;
 created_at |             repo              | stars
------------+-------------------------------+-------
 2024-10-17 | CrunchyData/pg_tileserv       |     1
 2024-10-17 | CrunchyData/postgres-operator |     1
 2024-10-17 | CrunchyData/pg_parquet        |    74
(3 rows)

Assume, it is the new day and we want to ingest the new data:

dbt run-operation copy_crunchy_events --args "{event_date: 2024-10-18}"
postgres=# select * from crunchy_gh.events LIMIT 2;
 count
-------
   242
(1 row)

Then, rerun the model to incrementally process the new events from the source table to update the daily stars:

dbt build --models daily_stars
postgres=# select * from crunchy_demos_crunchy_gh.daily_stars order by created_at;
 created_at |              repo               | stars
------------+---------------------------------+-------
 2024-10-17 | CrunchyData/pg_tileserv         |     1
 2024-10-17 | CrunchyData/postgres-operator   |     1
 2024-10-17 | CrunchyData/pg_parquet          |    74
 2024-10-18 | CrunchyData/pgCompare           |     2
 2024-10-18 | CrunchyData/pg_parquet          |   118
 2024-10-18 | CrunchyData/postgres-operator   |     2
 2024-10-18 | CrunchyData/pgmonitor-extension |     1
(7 rows)

Summary

We're excited about the new automation capabilities for scalable analytics solutions across Postgres and Iceberg using dbt and Crunchy Data Warehouse. This integration can make real-time analytics in Postgres more accessible to any organization.

We just looked at how dbt can create scalable, version controlled automation with Iceberg and Postgres in different way, like:

  • A dbt macro that automates the creation of Iceberg tables
  • A custom dbt macro that loads event data from an S3 dataset using Postgres' COPY command
  • Incremental processing in dbt for processing only new records
  • A dbt SQL transformation model that aggregates event data by day for easy analytics

As you start working with dbt, Iceberg, and Postgres, we'd love to hear from you.