Airflow Cross-DAG Dependencies

Catherine Shen
4 min readJul 20, 2021

--

Everything you need to know about connecting Airflow DAGs.

It is often a good idea to put all related tasks in the same DAG when creating an Airflow DAG. However, sometimes the DAG can become too complex and it’s necessary to create dependencies between different DAGs. Throughout this guide, we’ll walk through 3 different ways to link Airflow DAGs and compare the trade-offs for each of them.

Easy way: TriggerDagRunOperator

The AirflowTriggerDagRunOperator is an easy way to implement cross-DAG dependencies.

For example, you have two DAGs, upstream and downstream DAGs. You want to execute downstream DAG after task1 in upstream DAG is successfully finished. In this case, you can simply create one task with TriggerDagRunOperator in DAG1 and add it after task1 in the upstream DAG.

Below is an example DAG that implements the TriggerDagRunOperator to trigger the downstream-dag after task1 in the upstream DAG is finished.

This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment. You can insert it after any task in your upstream dag and one upstream DAG is able to trigger one or more downstream DAGs.

The trigger dag run operator in the upstream DAG triggered the downstream DAG.
downstream dag is triggered after test_trigger_dagrun step finished in the upstream dag
  • If you want to have task1 in the downstream DAG to be finished before task2 in upstream_dag is started, you can add wait_for_completion=True to the trigger_dag_operator. If you leave wait_for_completion parameter as default False, the upstream DAG will keep running the following tasks once the downstream DAG has started.

Same start date and schedule interval: ExternalTaskSensor

ExternalTaskSensor method is not as flexible as the TriggerDagRunOperator but it can be useful if you are cannot modify the upstream DAGs, but you still want to still add dependencies between the DAGs. This sensor will look up past executions of DAGs and tasks and will match those DAGs that share the same execution_date. The execution_date here is an instant which means the DAGs need to run in the same instant or one after another by a constant amount of time.

Below is an example DAG that implements the ExternalTaskSenstor to trigger the downstream DAG after two upstream DAGs are finished. They all have the same schedule ’*/10 * * * *’ as the upstream DAG.

downstream dag, upstream_dag1 and upstream_dag1 all have the same schedule
  • Enable execution_date_fn or execution_date if dags have a fixed difference for schedules.

Conditional DAG execution: XCOM with BranchPythonOperator

XCom stands for “cross-communication” and allows to exchange of messages or a small amount of data between tasks. You can think of an XCom as an object with keys and values which are stored in the metadata database of Airflow.

an example of XCOM key and value
  • The key is the identifier of your XCom which can be used to get back the XCOM value from a given task.
  • The value is the value of your XCom variable for a key. This is what information you want to share between tasks.

Use XCom with BranchPythonOperator

There are two major ways to create an XCOM variable in the airflow dag.

First, whenever you want to create an XCOM from a task, the easiest way to do it is by returning a value. In the case of the PythonOperator, use the return keyword along with the value in the python callable function in order to create automatically a XCOM variable.

Second, you can also set do_xcom_push = True for a given task.

In the following example, the upstream DAG publishes the values in the XCOM with python Operator, and there is a callback function to the branch operator which decides which downstream dag to trigger. This callback function would read the XCOM using the upstream task_id and then it would return the id of the task to be continued after this one.

upstream xcom value decides which dag to trigger in the downstream

Summary

The TriggerDagRunOperator is an ideal option when you have one upstream DAG that needs to trigger one or more downstream DAGs. It can also be an ideal replacement for SubDAGs. It’s the most flexible way to link DAGs.

Use ExternalTaskSensor when you have a downstream DAG that is dependent on multiple upstream DAGs. And the DAGs need to run in the same instant or one after another by a constant amount of time.

Combining XCOM with BranchPythonOperator can trigger downstream dags based on the value of upstream XCOM results.

--

--

Catherine Shen
Catherine Shen

Written by Catherine Shen

Software Engineer working on building big data & machine learning platform.

No responses yet