About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Thursday, October 31, 2024

TriggerDagRunOperator in Apache Airflow

The TriggerDagRunOperator in Apache Airflow is used to programmatically trigger another DAG run from within a DAG. This operator is useful for creating dynamic workflows where the execution of one DAG may depend on the successful completion of another DAG or when you want to kick off a separate workflow based on specific conditions.

How to Use TriggerDagRunOperator

You can use the TriggerDagRunOperator by specifying the dag_id of the target DAG you want to trigger. You can also pass parameters to the triggered DAG if needed.

Example

Here's an example of how to use TriggerDagRunOperator in Airflow:

Main DAG: main_dag.py

python
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from datetime import datetime # Define the main DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('main_dag', default_args=default_args, schedule_interval='@daily') as main_dag: start = DummyOperator(task_id='start') # Trigger another DAG named 'secondary_dag' trigger = TriggerDagRunOperator( task_id='trigger_secondary_dag', trigger_dag_id='secondary_dag', # The ID of the DAG to trigger wait_for_completion=True, # Wait for the triggered DAG to complete conf={"key": "value"}, # Optional parameters to pass to the triggered DAG ) end = DummyOperator(task_id='end') start >> trigger >> end

Triggered DAG: secondary_dag.py

python
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime # Define the secondary DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('secondary_dag', default_args=default_args, schedule_interval='@daily') as secondary_dag: start_secondary = DummyOperator(task_id='start_secondary') # You can access the `conf` parameter here if needed # For example, if you want to print the passed config from airflow.operators.python_operator import PythonOperator def print_config(**kwargs): conf = kwargs['dag_run'].conf print(f"Received config: {conf}") print_conf = PythonOperator( task_id='print_config', python_callable=print_config, provide_context=True, ) end_secondary = DummyOperator(task_id='end_secondary') start_secondary >> print_conf >> end_secondary

Explanation

  1. Main DAG (main_dag):

    • Contains a DummyOperator task named start to represent the beginning of the DAG.
    • Uses the TriggerDagRunOperator to trigger another DAG named secondary_dag.
    • The wait_for_completion parameter is set to True, meaning the main DAG will wait for the triggered DAG to finish before moving on to the next task (end).
    • Optionally, you can pass a configuration dictionary using the conf parameter, which can be accessed in the triggered DAG.
  2. Triggered DAG (secondary_dag):

    • Contains a DummyOperator to start and end the DAG.
    • A PythonOperator named print_config is used to demonstrate how to access the configuration passed from the main DAG. The configuration can be accessed using kwargs['dag_run'].conf.

Notes

  • Cross-DAG Dependencies: The TriggerDagRunOperator is particularly useful for establishing dependencies across different DAGs in a workflow.
  • Configuration: You can pass any parameters in the conf dictionary that the triggered DAG can read and use as needed.
  • Error Handling: Ensure to handle potential errors in the triggered DAG to avoid failures that could affect the main DAG execution.

This setup allows you to create a dynamic and interconnected workflow in Airflow by triggering one DAG from another.

Post Reference: Vikram Aristocratic Elfin Share

No comments:

Post a Comment