About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Thursday, October 31, 2024

ExternalTaskSensor in Apache Airflow

The ExternalTaskSensor in Apache Airflow is used to wait for a task in a different DAG (Directed Acyclic Graph) to complete before proceeding. This operator is useful when you have dependencies between tasks across multiple DAGs.

How to Use ExternalTaskSensor

To use the ExternalTaskSensor, you need to specify the task_id of the task you want to wait for and the dag_id of the DAG that contains that task. You can also define the execution date for which the task should be checked.

Example

Here's a basic example of how to use ExternalTaskSensor in an Airflow DAG:

Setup

  1. DAG 1: The first DAG, which contains the task that will be waited on.
  2. DAG 2: The second DAG, which includes the ExternalTaskSensor.

DAG 1: dag_one.py

python
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('dag_one', default_args=default_args, schedule_interval='@daily') as dag_one: start_task = DummyOperator(task_id='start_task') end_task = DummyOperator(task_id='end_task') start_task >> end_task

DAG 2: dag_two.py

python
from airflow import DAG from airflow.sensors.external_task_sensor import ExternalTaskSensor from airflow.operators.dummy_operator import DummyOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('dag_two', default_args=default_args, schedule_interval='@daily') as dag_two: wait_for_dag_one = ExternalTaskSensor( task_id='wait_for_dag_one', external_dag_id='dag_one', external_task_id='end_task', # This is the task in dag_one to wait for mode='poke', # or 'reschedule' timeout=600, # Timeout after 10 minutes poke_interval=30, # Check every 30 seconds ) next_task = DummyOperator(task_id='next_task') wait_for_dag_one >> next_task

Explanation

  1. DAG 1 (dag_one):

    • Contains two dummy tasks: start_task and end_task.
    • The end_task is the one we will wait for in dag_two.
  2. DAG 2 (dag_two):

    • Uses the ExternalTaskSensor to wait for the end_task in dag_one.
    • The mode can be set to poke (checks periodically) or reschedule (reschedules the task when it’s not yet done).
    • You can set timeout to specify how long to wait for the external task to complete and poke_interval for how often to check.

Notes

  • Ensure both DAGs are in the same Airflow instance.
  • The start_date in both DAGs should be set appropriately for the ExternalTaskSensor to work as expected.
  • The external_task_id must match the task_id in the external DAG.

This setup allows dag_two to execute next_task only after end_task in dag_one has completed successfully.

Post Reference: Vikram Aristocratic Elfin Share

No comments:

Post a Comment