close

How to Trigger a DAG on the success of a another DAG in Airflow using Python?

Hello Guys, How are you all? Hope You all Are Fine. Today We Are Going To learn about How to Trigger a DAG on the success of a another DAG in Airflow using Python in Python. So Here I am Explain to you all the possible Methods here.

Without wasting your time, Let’s start This Article.

Table of Contents

How to Trigger a DAG on the success of a another DAG in Airflow using Python?

  1. How to Trigger a DAG on the success of a another DAG in Airflow using Python?

    I'm hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator (as opposed to poll-based triggering of ExternalTaskSensor)

  2. Trigger a DAG on the success of a another DAG in Airflow using Python

    I'm hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator (as opposed to poll-based triggering of ExternalTaskSensor)

Method 1

I’m hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator (as opposed to poll-based triggering of ExternalTaskSensor)

from typing import List

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule

# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
                  start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..

# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
                                                               task_id='my_trigger_task',
                                                               trigger_rule=TriggerRule.ALL_SUCCESS,
                                                               external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task

Method 2

I believe you are looking for SubDags operator, running a Dag in a bigger dag. Note that creating many subdags like in the example below gets messy pretty quick, so I recommend splitting each subdag in a file and importing then in a main file.

The SubDagOperator is simple to use you need to give an Id, a subdag (the child) and a dag(the parent)

subdag_2 = SubDagOperator(
        task_id="just_some_id", 
        subdag=child_subdag, <---- this must be a DAG
        dag=parent_dag, <----- this must be a DAG
        )

It will look like this: 

From their examples repo

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
            dag_id='%s.%s' % (parent_dag_name, child_dag_name),
            default_args=args,
            schedule_interval="@daily",
            )

    for i in range(5):
        DummyOperator(
                task_id='%s-task-%s' % (child_dag_name, i + 1),
                default_args=args,
                dag=dag_subdag,
                )

    return dag_subdag

DAG_NAME = 'example_subdag_operator'

args = {
        'owner': 'airflow',
        'start_date': days_ago(2),
        }

dag = DAG(
        dag_id=DAG_NAME,
        default_args=args,
        schedule_interval="@once",
        tags=['example']
        )

start = DummyOperator(
        task_id='start-of-main-job',
        dag=dag,
        )

some_other_task = DummyOperator(
        task_id='some-other-task',
        dag=dag,
        )


end = DummyOperator(
        task_id='end-of-main-job',
        dag=dag,
        )

subdag = SubDagOperator(
        task_id='run-this-dag-after-previous-steps',
        subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
        dag=dag,
        )

start >> some_other_task >> end >> subdag

Summery

It’s all About this issue. Hope all Methods helped you a lot. Comment below Your thoughts and your queries. Also, Comment below which Method worked for you? Thank You.

Also, Read