Building incremental tables

Introduction

Incremental tables don't get rebuilt from scratch every time they are run. Instead, only new rows are inserted into the table, according to the conditions that are provided in the table configuration.

Dataform takes care of managing the state, creating tables and generating insert statements for you.

Incremental table purposes

There are a few different use cases for incremental tables, covered below.

Append only for performance

Data sources such as logs or web analytics data are great use cases for incremental tables. When processing this data, you ideally only want to process new records, instead of having to reprocess months or possibly years of data.

Micro-batching for latency

Another benefit of this approach is that pipelines are quicker and can be run more frequently at lower cost, reducing downstream latency to other consumers of these large tables.

Creating daily snapshots

Another use case of incremental tables is to create a daily snapshot of a dataset, for example enabling you to do longitudinal analysis on user settings in a production database.

Incremental table concepts

Incremental tables are stateful and make use of insert statements. As a result, care must be taken to ensure that data is not erased, or duplicated during this process.

There are 2 parts to an incremental table build that you must configure:

  • The main query that generates new rows
  • A where clause to determine which rows should be processed

Where clause

When inserting new data, the where clause is applied to the main query before selecting, and is how you make sure only new data is added to the table. This is typically achieved with a clause such as:

ts > (select max(timestamp) from target_table

A simple example

Let's assume we have a source table containing timestamped data about user actions on our site called weblogs.user_actions, that is streamed into our warehouse via some other data integration tool:

timestampuser_idaction
150292030403940create_project
150293029320492logout
150294029230920login
.........

To make a simple incremental copy of this table, we can create a new file called definitions/example_incremental.sql:

--js type("incremental");
--js where(`timestamp > (select max(timestamp) from ${self()})`);

select timestamp, action
from weblogs.user_actions

Breaking the above down, we first have the type of the table set to "incremental".

We then specify a where clause using the where(...) configuration statement:

--js where(`timestamp > (select max(timestamp) from ${self()})`);

This makes sure only rows from the source table with a timestamp greater than the latest timestamp we have processed so far are selected in the query.

We are using the self() method to get a reference to our current table. The compiled where clause will get expanded to:

timestamp > (select max(timestamp) from default_schema.example_incremental)

This table may not exist yet - that's OK, as the where clause will only be used if the table does exist, and we are inserting new data.

When inserting into incremental tables, only already existing fields in the destination table will be written. To make sure new fields are written after changing the query, the table must be re-built with the --full-refresh option enabled.

Generated SQL

The SQL commands generated by the above will depend on the warehouse type, but generally follow the same format.

When the table doesn't exist

create or replace table default_schema.example_incremental as
  select timestamp, user_action
  from weblogs.user_actions;

When inserting new rows incrementally

insert into default_schema.example_incremental (timestamp, action)
  select timestamp, user_action
  from weblogs.user_actions
  where timestamp > (select max(timestamp) from default_schema.example_incremental)

Daily snapshots with incremental tables

Incremental tables can be used in other ways, for example to create a daily snapshot of mutable external tables.

Imagine you have another an external table called productiondb.customers that we would like to snapshot on a daily basis, and is replicated into our warehouse via another mechanism.

You can write the following incremental to create a daily snapshot of that data:

--js type("incremental");
--js where(`snapshot_date > (select max(snapshot_date) from ${self()})`);
--js descriptor(["snapshot_date", "customer_id", "name", "account_settings", ...]);
select current_date() as snapshot_date, customer_id, name, account_settings from productiondb.customers

By selecting the current date as the extraction date, this effectively puts one, and only one, copy of customer data into the output table each day.

This dataform project must be scheduled to run at least once a day to make sure we keep generating copies.

Protecting incremental tables from data loss

It's possible to force an incremental table to be rebuilt from scratch using either the command line interface or via https://dataform.co, when the --full-refresh option is provided.

For incremental tables that cannot be rebuilt from scratch, for example if the source data is only temporary, or expires at some point - you can mark an incremental table as protected. This means Dataform will never delete this table, even if a user asks for a full refresh.

definitions/incremental_example_protected.sql:

--js type("incremental");
--js protected();
select ...

Table partitioning / dist keys

As incremental tables produced are usually timestamp based, it's best practice to set up table partioning on the timestamp field, to speed up downstream queries.

For more information, check out the Bigquery and Redshift guides.