Incremental datasets

Learn how to configure tables that update incrementally.

Introduction

Incremental datasets aren't rebuilt from scratch every time they run. Instead, only new rows are inserted (or merged) into the dataset according to the conditions you provide when configuring the dataset.

Dataform takes care of managing state, creating datasets, and generating INSERT (or MERGE ) statements for you.

Example: Append only for performance

Web logs or analytics data are great use cases for incremental datasets. For these kinds of data sources you usually only want to process new records instead of having to reprocess old data.

Example: Micro-batching for latency

A major benefit of incremental datasets is that pipelines complete more quickly and can therefore be run more frequently at lower cost, reducing downstream latency to consumers of these datasets.

Example: Creating daily snapshots

A common incremental dataset use case is to create daily snapshots of an input dataset, for example to use in longitudinal analysis of user settings stored in a production database.

Key concepts

Incremental datasets are inherently stateful and make use of INSERT statements. Thus, care must be taken to ensure that data is not erased or duplicated during insertion of new rows.

There are 2 parts to an incremental dataset that you must configure:

  • The main query that selects all rows
  • A WHERE clause that determines which subset of rows should be processed for each incremental run

WHERE clause

The WHERE clause is applied to the main query and is used to ensure that only new data is added to the incremental dataset. For example:

1ts > (SELECT MAX(timestamp) FROM target_table)

A simple example

Assuming there is a source dataset containing timestamped user action events called weblogs.user_actions which is streamed into our warehouse via some other data integration tool:

timestamp user_id action
1502920304 03940 create_project
1502930293 20492 logout
1502940292 30920 login
... ... ...

To make a simple incremental copy of this dataset, create a new file called definitions/example_incremental.sqlx :

1config { type: "incremental" }
2
3SELECT timestamp, action
4FROM weblogs.user_actions
5
6${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

First the script sets the type of the dataset to incremental .

It then specifies a WHERE clause using the when() and incremental() functions:

1${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

This ensures that only rows from the source dataset with a timestamp greater than the latest timestamp that has been processed so far are selected in the incremental query.

Note that self() is used here in order to get the name of the current dataset. Thus the compiled WHERE clause will be expanded to:

1timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental)

Note that for BigQuery, incremental tables need to be written slightly differently to avoid full table scans. See our [BigQuery specific guide](/warehouses/bigquery#optimizing-partitioned-incremental-tables-for-bigquery) for more info.

This dataset may not exist in the warehouse yet. That's OK, because the WHERE clause will only be added to the final query if the dataset already exists and new data is being inserted into it.

Note that when data is inserted into an incremental dataset, only fields that already exist in the dataset will be written. To make sure that new fields are written after changing the query, the dataset must be rebuilt from scratch with the --full-refresh option (if using the Dataform tool on the command line) or with the Run with full refresh option in Dataform Web.

Generated SQL

The SQL generated by the above example will depend on the warehouse type, but generally follow the same format.

If the dataset doesn't exist yet:

1CREATE OR REPLACE TABLE default_schema.example_incremental AS
2  SELECT timestamp, action
3  FROM weblogs.user_actions;

When incrementally inserting new rows:

1INSERT INTO default_schema.example_incremental (timestamp, action)
2  SELECT timestamp, user_action
3  FROM weblogs.user_actions
4  WHERE timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental)

If no unique key is specified, then the merge condition (T.user_id = S.user_id in this example) is set as false , causing rows to always be inserted rather than merged.

A merge example

Incremental merging requires @dataform/core version 1.5.0 .
Incremental merging is not current supported for Azure SQLDataWarehouse.

If you want to ensure that the output table only ever contains one row per some combination of key columns, you should specify a uniqueKey . When uniqueKey is specified, if a row arrives whose key matches an existing row's key, then the existing row is overwritten with the new data.

1config {
2  type: "incremental",
3  uniqueKey: ["transaction_id"]
4}
5
6SELECT timestamp, action FROM weblogs.user_actions
7${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

If using uniqueKey with BigQuery, we recommend that you set an updatePartitionFilter to only consider a subset of records. This helps to optimize costs, since without an updatePartitionFilter , BigQuery will scan the whole table to find matching rows.

1config {
2  type: "incremental",
3  uniqueKey: ["transaction_id"],
4  bigquery: {
5    partitionBy: "DATE(timestamp)",
6    updatePartitionFilter:
7        "timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
8  }
9}
10
11SELECT timestamp, action FROM weblogs.user_actions
12${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

Generated SQL

As with above, the SQL generated by the above example will depend on the warehouse type, but generally follow the same format.

If the dataset doesn't exist yet:

1CREATE OR REPLACE TABLE default_schema.example_incremental PARTITION BY Date(timestamp) AS
2  SELECT timestamp, action
3  FROM weblogs.user_actions;

When incrementally merging new rows:

1MERGE default_schema.example_incremental T
2USING (
3  SELECT timestamp, action
4  FROM weblogs.user_actions
5  WHERE timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental) S
6ON T.user_id = S.user_id AND T.action = S.action AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 48 HOUR)
7WHEN MATCHED THEN
8  UPDATE SET timestamp = S.timestamp, user_id = S.user_id, action = S.action
9WHEN NOT MATCHED THEN
10  INSERT (timestamp, user_id, action) VALUES (timestamp, user_id, action)

Daily snapshots with incremental datasets

Incremental datasets can be used to create a daily snapshot of mutable external datasets.

Assuming an external dataset called productiondb.customers , you could write the following incremental to create a daily snapshot of that data:

1config { type: "incremental" }
2
3SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers
4
5${ when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }
  • By selecting the current date as snapshot_date , this effectively appends a dated snapshot of the productiondb.customers dataset to the output dataset each day.
  • The WHERE clause here prevents more than one insertion per day (which could result in duplicate rows in the output dataset), since any newly inserted rows must have a later snapshot_date than any previously existing row in the output.
  • This Dataform project should be scheduled to run at least once a day.

Protecting incremental datasets from data loss

It's possible to force an incremental dataset to be rebuilt from scratch using either the command line interface with the --full-refresh option or with the Run with full refresh option in Dataform Web.

If you need to protect a dataset from ever being rebuilt from scratch, for example if the source data is only temporary, you can mark an incremental dataset as protected . This means that Dataform will never delete this dataset, even if a user requests a full refresh.

definitions/incremental_example_protected.sqlx :

1config {
2  type: "incremental",
3  protected: true
4}
5SELECT ...

Table partitioning / dist keys

Since incremental datasets are usually timestamp based, it's a best practice to set up dataset partioning on the timestamp field to speed up downstream queries.

For more information, check out the BigQuery and Redshift guides.

What's next

Publish datasets

Learn how to publish tables and views in your warehouse.

Document datasets

Learn how to add data documentation for your tables and views.

Sitemap