Incremental datasets

Introduction

Incremental datasets aren't rebuilt from scratch every time they run. Instead, only new rows are inserted into the dataset according to the conditions you provide when configuring the dataset.

Dataform takes care of managing state, creating datasets, and generating INSERT statements for you.

Example: Append only for performance

Web logs or analytics data are great use cases for incremental datasets. For these kinds of data sources you usually only want to process new records instead of having to reprocess old data.

Example: Micro-batching for latency

A major benefit of incremental datasets is that pipelines complete more quickly and can therefore be run more frequently at lower cost, reducing downstream latency to consumers of these datasets.

Example: Creating daily snapshots

A common incremental dataset use case is to create daily snapshots of an input dataset, for example to use in longitudinal analysis of user settings stored in a production database.

Key concepts

Incremental datasets are inherently stateful and make use of INSERT statements. Thus, care must be taken to ensure that data is not erased or duplicated during insertion of new rows.

There are 2 parts to an incremental dataset that you must configure:

  • The main query that selects all rows
  • A WHERE clause that determines which subset of rows should be processed for each incremental run

WHERE clause

The WHERE clause is applied to the main query and is used to ensure that only new data is added to the incremental dataset. For example:

ts > (SELECT MAX(timestamp) FROM target_table)

A simple example

Assuming there is a source dataset containing timestamped user action events called weblogs.user_actions which is streamed into our warehouse via some other data integration tool:

timestampuser_idaction
150292030403940create_project
150293029320492logout
150294029230920login
.........

To make a simple incremental copy of this dataset, create a new file called definitions/example_incremental.sql:

--js type("incremental");
--js where(`timestamp > (SELECT MAX(timestamp) FROM ${self()})`);

SELECT timestamp, action
FROM weblogs.user_actions

First the script sets the type of the dataset to incremental.

It then specifies a WHERE clause using the where() configuration statement:

--js where(`timestamp > (SELECT MAX(timestamp) FROM ${self()})`);

This ensures that only rows from the source dataset with a timestamp greater than the latest timestamp that has been processed so far are selected in the incremental query.

Note that the self() method is used here in order to get the name of the current dataset. Thus the compiled WHERE clause will be expanded to:

timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental)

This dataset may not exist in the warehouse yet. That's OK, because the WHERE clause will only be added to the final query if the dataset already exists and new data is being inserted into it.

Note that when data is inserted into an incremental dataset, only fields that already exist in the dataset will be written. To make sure that new fields are written after changing the query, the dataset must be rebuilt from scratch with the --full-refresh option (if using the Dataform tool on the command line) or with the Run with full refresh option in Dataform Web.

Generated SQL

The SQL generated by the above example will depend on the warehouse type, but generally follow the same format.

If the dataset doesn't exist yet:

CREATE OR REPLACE TABLE default_schema.example_incremental AS
  SELECT timestamp, user_action
  FROM weblogs.user_actions;

When incrementally inserting new rows:

INSERT INTO default_schema.example_incremental (timestamp, action)
  SELECT timestamp, user_action
  FROM weblogs.user_actions
  WHERE timestamp > (SELECT MAX(timestamp) FROM default_schema.example_incremental)

Daily snapshots with incremental datasets

Incremental datasets can be used to create a daily snapshot of mutable external datasets.

Assuming an external dataset called productiondb.customers, you could write the following incremental to create a daily snapshot of that data:

--js type("incremental");
--js where(`snapshot_date > (SELECT MAX(snapshot_date) FROM ${self()})`);
SELECT CURRENT_DATE() AS snapshot_date, customer_id, name, account_settings FROM productiondb.customers
  • By selecting the current date as snapshot_date, this effectively appends a dated snapshot of the productiondb.customers dataset to the output dataset each day.
  • The WHERE clause here prevents more than one insertion per day (which could result in duplicate rows in the output dataset), since any newly inserted rows must have a later snapshot_date than any previously existing row in the output.
  • This Dataform project should be scheduled to run at least once a day.

Protecting incremental datasets from data loss

It's possible to force an incremental dataset to be rebuilt from scratch using either the command line interface with the --full-refresh option or with the Run with full refresh option in Dataform Web.

If you need to protect a dataset from ever being rebuilt from scratch, for example if the source data is only temporary, you can mark an incremental dataset as protected(). This means that Dataform will never delete this dataset, even if a user requests a full refresh.

definitions/incremental_example_protected.sql:

--js type("incremental");
--js protected();
SELECT ...

Table partitioning / dist keys

Since incremental datasets are usually timestamp based, it's a best practice to set up dataset partioning on the timestamp field to speed up downstream queries.

For more information, check out the BigQuery and Redshift guides.