Calls to the Dataform Web API are authenticated using API tokens. These can be created from the Dataform project's settings page.
When calling the API, you can pass your API token in an Authorization
header, using a bearer token format.
For example, with curl
these headers can be provided as follows:
1curl -H "Authorization: Bearer 5235783003799552|s+N8gAs72qbi90pFEv7yW/KBImTshRdBoVKjjFA7lD0=|1" https://api.dataform.co/v1/project/1234/run/5678
Runs can be created by making a POST
call to the RunCreate
method.
For detailed documentation on supported parameters, see the RunCreate
reference documentation.
For example, to create a run for the project ID 1234
and to trigger a specific schedule name:
1curl -H "Authorization: Bearer 5235783003799552|s+N8gAs72qbi90pFEv7yW/KBImTshRdBoVKjjFA7lD0=|1" -X POST -d '{ "scheduleName": "some_schedule" }' https://api.dataform.co/v1/project/1234/run
This will return a RunGetResponse, that includes the created run's ID:
1{ 2 "id": "1029591293203" 3}
After creating a run, the status of the run can be checked with the RunGet
method.
This should be a GET
request to the appropriate path, for example for project ID 1234
and a run ID 5678
:
1curl -H "Authorization: Bearer 5235783003799552|s+N8gAs72qbi90pFEv7yW/KBImTshRdBoVKjjFA7lD0=|1" https://api.dataform.co/v1/project/1234/run/5678
This will return a RunGetResponse
such as:
1{ 2 "id": "5678", 3 "status": "RUNNING", 4 "runLogUrl": "https://app.dataform.co/#/1234/run/5678" 5}
Using the REST API it's possible to trigger Dataform schedules from a third party orchestration tool, like Airflow or Luigi.
The following example, written in Python, shows how you can trigger a schedule, check its status every 10 seconds, and exit when the schedule finishes.
1import requests 2import time 3import json 4 5base_url='https://api.dataform.co/v1/project/<PROJECT_ID>/run' 6headers={'Authorization': 'Bearer <API_TOKEN>'} 7run_create_request={"environmentName": "<ENVIRONMENT_NAME>", "scheduleName": "<SCHEDULE_NAME>"} 8 9response = requests.post(base_url, data=json.dumps(run_create_request), headers=headers) 10 11run_url = base_url + '/' + response.json()['id'] 12 13response = requests.get(run_url, headers=headers) 14 15while response.json()['status'] == 'RUNNING': 16 time.sleep(10) 17 response = requests.get(run_url, headers=headers) 18 print(response.json())