Using Orbit with Airflow¶
Apache Airflow is a free, open-source workflow management tool. It acts as a scheduler for recurring multistage batch jobs. See the Airflow documentation for more information.
Orbit's flexibility and modularity allows it to easily integrate with an Airflow workflow. We can use Airflow to manage the scheduling of tasks while using Orbit to monitor the quality and integrety of the data inputted and outputted at every step.
How does Orbit work with Airflow¶
The structure of Airflow a workflow or pipeline is a Directed Acyclic Graph (DAG). Each node is a task, which typically ingests input from a database or files, and outputs data similarly for downstream nodes to injest. Airflow can schedule and run these workflows on recurring schedules, or based on triggers.
A (very simple) workflow in Airflow might like look this:
Orbit can also schedule jobs, but its modularity allows us to use whichever tools are most relevant. Since we're going to use Airflow to schedule the processing of new data, we can incorporate Data Contracts from Orbit to validate the data between every Operator. We can then use the Orbit dashboard to alert us to issues such as anomolies in the raw input sources, or final preprocessed data to ensure that the data is inference model-ready.
Data Contract validation code can be simply added as new nodes in the DAG, which load the output of the upstream node, load the appropriate Data Contract, and validate the data.
Because we are using Airflow to schedule our data processing, we will be using Orbit to monitor data but not as a scheduler.
Example¶
The example referenced in this section is available on Github.
In this article we're going to use a simple Airflow DAG that preprocesses incoming data from air quality monitors around Stuttgard, and demonstrate how to use Orbit to monitor the quality and integrety of the data at each step.
Problem¶
The data in this example comes from a municipal air quality project in Stuttgard, Germany. Information on this project is available here and a portion of the data is easily available from Kaggle.
The data consists of monthly files from two kinds of sensors from many locations around the city: the BME280, which measures air temperature, pressure, and humidity, and the SDS011, which measures particulate matter in air.
Airflow pipeline¶
Our Airflow DAG preprocesses incoming data from air quality monitors around Stuttgard.
For simplicity we have just two tasks. First we have one that loads in the data for each sensor for the current time period, renames columns, removes columns, and casts datatypes. Then we have a stage that loads the output of the first stage, groups the measurements into consistent time intervals, and joins the two by time interval and location.
We're then going to demonstrate how we can use Orbit to monitor the quality and correctness of the data before, between, and after these stages, without a risk of breaking the existing pipeline by changing the dependency path for the final output.
Run baselines to create data contract¶
In the code, three's a file called baseline.py
, which calls the preprocessing functions used in our Airflow tasks for the purpose of creating the Data Contracts. It uses the first month of data available, as a simulation of training data.
We can run it directly to create Data Contracts that will fit on the output of each stage.
python baseline.py
This will create two data contract files.
Create data contract nodes¶
The first stage outputs two files for a month's sensor data from each sensor.
We're going to create a task that loads that data, the appropriate Data Contracts, and uses them to validate the data. Once this task runs the results will be trackable via the Orbit dashboard
from foundations import DataContract from config import * df_bme = load_data(data_folder / input_bme_filename) df_sds = load_data(data_folder / input_sds_filename) stage1_bme_data_contract = DataContract.load(PROJECT_PATH, "stage1_bme") stage1_sds_data_contract = DataContract.load(PROJECT_PATH, "stage1_sds") stage1_bme_report = stage1_bme_data_contract.validate(df_bme_stage1, date_string) stage1_sds_report = stage1_sds_data_contract.validate(df_sds_stage1, date_string)
Save this as stage1_dc.py
The second stage outputs a joined and processed dataframe. We can validate it as follows
from foundations import DataContract from config import * stage2_merged_data_contract = DataContract.load(PROJECT_PATH, "stage2_df") stage2_report = stage2_merged_data_contract.validate(df, date_string)
Save this as stage2_dc.py
Modify the pipeline¶
The sensor_data_pipeline.py
file is where the Airflow DAG is defined. See the Airflow documentation for much more detail on how this works.
We'll need to add
t1_dc = BashOperator( task_id='Stage1_datavalidation', bash_command='python /Users/sr/projects/airflow-orbit-project/airflow-orbit/stage1_datavalidation.py {{ macros.ds_format(ds, "%Y-%m-%d", "%Y-%m") }} ', dag=sensor_dag)
and
t2_dc = BashOperator( task_id='Stage2_datavalidation', bash_command='python /Users/sr/projects/airflow-orbit-project/airflow-orbit/stage2_datavalidation.py {{ macros.ds_format(ds, "%Y-%m-%d", "%Y-%m") }} ', dag=sensor_dag)
We want to add these downstream of their respective tasks. To do this we can add the following lines
t1_dc << t1 t2_dc << t2
Importantly, t2
does not depend on t1_dc
, t1_dc
is a daughter node of t1
and nothing further depends on it. Likewise nothing depends on t2_dc
. This means that the dependency path to the final output of our already existing Airflow pipeline is unmodified, making the incorporation of Orbit monitoring in this way safe for mature and complex Airflow workflows.
Run the pipeline¶
Now we can use Airflow to unpause our modified pipeline. Once it runs, we can monitor our Data Contracts on the Orbit dashboard.