Incremental datasets aren't rebuilt from scratch every time they run. Instead, only new rows are inserted (or merged) into the dataset according to the conditions you provide when configuring the dataset.
Dataform takes care of managing state, creating datasets, and generating INSERT
(or MERGE
) statements for you.
Web logs or analytics data are great use cases for incremental datasets. For these kinds of data sources you usually only want to process new records instead of having to reprocess old data.
A major benefit of incremental datasets is that pipelines complete more quickly and can therefore be run more frequently at lower cost, reducing downstream latency to consumers of these datasets.
A common incremental dataset use case is to create daily snapshots of an input dataset, for example to use in longitudinal analysis of user settings stored in a production database.
Incremental datasets are inherently stateful and make use of INSERT
statements. Thus, care must be taken to ensure that data is not erased or duplicated during insertion of new rows.
There are 2 parts to an incremental dataset that you must configure:
WHERE
clause that determines which subset of rows should be processed for each incremental runThe WHERE
clause is applied to the main query and is used to ensure that only new data is added to the incremental dataset. For example:
1ts > (SELECT MAX(timestamp) FROM target_table)
Assuming there is a source dataset containing timestamped user action events called weblogs.user_actions
which is streamed into our warehouse via some other data integration tool:
timestamp | user_id | action |
---|---|---|
1502920304 | 03940 | create_project |
1502930293 | 20492 | logout |
1502940292 | 30920 | login |
... | ... | ... |
To make a simple incremental copy of this dataset, create a new file called definitions/example_incremental.sqlx
:
1config { type: "incremental" } 2 3SELECT timestamp, action 4FROM weblogs.user_actions 5 6${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
First the script sets the type of the dataset to incremental
.
It then specifies a WHERE
clause using the when()
and incremental()
functions:
1${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
This ensures that only rows from the source dataset with a timestamp greater than the latest timestamp that has been processed so far are selected in the incremental query.
Note that self()
is used here in order to get the name of the current dataset. Thus the compiled WHERE
clause will be expanded to:
1timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental)
This dataset may not exist in the warehouse yet. That's OK, because the WHERE
clause will only be added to the final query if the dataset already exists and new data is being inserted into it.
The SQL generated by the above example will depend on the warehouse type, but generally follow the same format.
If the dataset doesn't exist yet:
1CREATE OR REPLACE TABLE default_schema.example_incremental AS 2 SELECT timestamp, action 3 FROM weblogs.user_actions;
When incrementally inserting new rows:
1INSERT INTO default_schema.example_incremental (timestamp, action) 2 SELECT timestamp, user_action 3 FROM weblogs.user_actions 4 WHERE timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental)
If no unique key is specified, then the merge condition (T.user_id = S.user_id
in this example) is set as false
, causing rows to always be inserted rather than merged.
If you want to ensure that the output table only ever contains one row per some combination of key columns, you should specify a uniqueKey
. When uniqueKey
is specified, if a row arrives whose key matches an existing row's key, then the existing row is overwritten with the new data.
1config { 2 type: "incremental", 3 uniqueKey: ["transaction_id"] 4} 5 6SELECT timestamp, action FROM weblogs.user_actions 7${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
If using uniqueKey
with BigQuery, we recommend that you set an updatePartitionFilter
to only consider a subset of records. This helps to optimize costs, since without an updatePartitionFilter
, BigQuery will scan the whole table to find matching rows.
1config { 2 type: "incremental", 3 uniqueKey: ["transaction_id"], 4 bigquery: { 5 partitionBy: "DATE(timestamp)", 6 updatePartitionFilter: 7 "timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)" 8 } 9} 10 11SELECT timestamp, action FROM weblogs.user_actions 12${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }
As with above, the SQL generated by the above example will depend on the warehouse type, but generally follow the same format.
If the dataset doesn't exist yet:
1CREATE OR REPLACE TABLE default_schema.example_incremental PARTITION BY Date(timestamp) AS 2 SELECT timestamp, action 3 FROM weblogs.user_actions;
When incrementally merging new rows:
1MERGE default_schema.example_incremental T 2USING ( 3 SELECT timestamp, action 4 FROM weblogs.user_actions 5 WHERE timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental) S 6ON T.user_id = S.user_id AND T.action = S.action AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 48 HOUR) 7WHEN MATCHED THEN 8 UPDATE SET timestamp = S.timestamp, user_id = S.user_id, action = S.action 9WHEN NOT MATCHED THEN 10 INSERT (timestamp, user_id, action) VALUES (timestamp, user_id, action)
Incremental datasets can be used to create a daily snapshot of mutable external datasets.
Assuming an external dataset called productiondb.customers
, you could write the following incremental to create a daily snapshot of that data:
1config { type: "incremental" } 2 3SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers 4 5${ when(incremental(), `WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`) }
snapshot_date
, this effectively appends a dated snapshot of the productiondb.customers
dataset to the output dataset each day.WHERE
clause here prevents more than one insertion per day (which could result in duplicate rows in the output dataset), since any newly inserted rows must have a later snapshot_date
than any previously existing row in the output.It's possible to force an incremental dataset to be rebuilt from scratch using either the command line interface with the --full-refresh
option or with the Run with full refresh
option in Dataform Web.
If you need to protect a dataset from ever being rebuilt from scratch, for example if the source data is only temporary, you can mark an incremental dataset as protected
.
This means that Dataform will never delete this dataset, even if a user requests a full refresh.
definitions/incremental_example_protected.sqlx
:
1config { 2 type: "incremental", 3 protected: true 4} 5SELECT ...
Since incremental datasets are usually timestamp based, it's a best practice to set up dataset partioning on the timestamp field to speed up downstream queries.
For more information, check out the BigQuery and Redshift guides.