close

How to create a conditional task in Airflow

Hello Guys, How are you all? Hope You all Are Fine. Today We Are Going To learn about How to create a conditional task in Airflow in Python. So Here I am Explain to you all the possible Methods here.

Without wasting your time, Let’s start This Article.

Table of Contents

How to create a conditional task in Airflow?

  1. How to create a conditional task in Airflow?

    All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.

  2. create a conditional task in Airflow

    All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.

Method 1

You have to use airflow trigger rules

All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.

The trigger rule possibilities:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

Here is the idea to solve your problem:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)

Summery

It’s all About this issue. Hope all Methods helped you a lot. Comment below Your thoughts and your queries. Also, Comment below which Method worked for you? Thank You.

Also, Read