About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Thursday, October 31, 2024

BranchPythonOperator in Apache Airflow

The BranchPythonOperator in Apache Airflow allows you to direct your workflow down different paths based on conditions you set in a Python function. It’s especially useful when you need to implement conditional logic within your DAG to decide which tasks to execute next.

How BranchPythonOperator Works

  1. Define a Branching Function: The function you define for BranchPythonOperator should return the task_id of the next task(s) you want to execute.
  2. Skipping Unselected Branches: Only the branch specified by the function will execute, while other branches are automatically marked as skipped.

Branching Based on Weekday or Weekend

This example will demonstrate how to use BranchPythonOperator to direct the workflow down different paths based on whether the current day is a weekday or a weekend.

python
from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from datetime import datetime import pendulum # Define the DAG with DAG( dag_id="weekday_weekend_branching_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Define branching function def weekday_or_weekend(): current_day = pendulum.now().day_of_week # Monday is 0 and Sunday is 6 if current_day < 5: # Weekdays are 0-4 return "weekday_task" else: # Weekend is 5-6 return "weekend_task" # BranchPythonOperator to determine the branch branching = BranchPythonOperator( task_id="branching", python_callable=weekday_or_weekend, provide_context=True ) # Define tasks for weekday and weekend paths def weekday_task_func(): print("Today is a weekday. Running weekday task.") def weekend_task_func(): print("Today is a weekend. Running weekend task.") # Task for weekday path weekday_task = PythonOperator( task_id="weekday_task", python_callable=weekday_task_func ) # Task for weekend path weekend_task = PythonOperator( task_id="weekend_task", python_callable=weekend_task_func ) # Define dependencies branching >> [weekday_task, weekend_task]

Explanation

  1. Branching Logic:

    • The function weekday_or_weekend() checks the current day of the week using pendulum.now().day_of_week.
    • It returns "weekday_task" if it’s a weekday, otherwise "weekend_task".
  2. BranchPythonOperator:

    • The branching task uses the weekday_or_weekend function to decide the path.
    • It executes either the weekday_task or weekend_task based on the returned value.
  3. Skipping Unchosen Tasks:

    • Only the task with the matching task_id will run, while the other task will be marked as skipped.

Output Example

If this DAG is executed on a Monday:

  • Output: "Today is a weekday. Running weekday task."
  • Skipped: The weekend_task
Post Reference: Vikram Aristocratic Elfin Share

No comments:

Post a Comment