About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Thursday, October 31, 2024

FileSensor operator in Apache Airflow

The FileSensor operator in Apache Airflow is used to monitor the existence of a specific file in a directory. It pauses the execution of a task until the specified file is detected. This is especially useful for workflows that depend on data files being available before processing begins.

Example: Using FileSensor to Wait for a File

Suppose we have a DAG that processes data once a CSV file is available in a specific directory. We’ll use FileSensor to check if the file exists before running any data processing.

Example DAG with FileSensor

python
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.sensors.filesystem import FileSensor from datetime import datetime # Define file path to monitor file_path = "/path/to/your/file/data.csv" # Define the DAG with DAG( dag_id="file_sensor_example_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Define the FileSensor to check for the file's existence wait_for_file = FileSensor( task_id="wait_for_file", filepath=file_path, # Path of the file to wait for fs_conn_id="fs_default", # File system connection ID poke_interval=60, # Check every 60 seconds timeout=60 * 60 * 2, # Timeout after 2 hours if the file isn't found mode="poke" # Use "poke" mode for frequent checking ) # Define a data processing task def process_data(): print("Data file found. Processing data...") data_processing = PythonOperator( task_id="data_processing", python_callable=process_data ) # Set task dependencies wait_for_file >> data_processing

Explanation of the DAG

  1. FileSensor:

    • The wait_for_file task uses FileSensor to wait until the file specified by filepath (data.csv) exists.
    • fs_conn_id="fs_default" is the connection ID for the file system, set in Airflow's Connections UI. This can be configured to point to different file systems or directories as needed.
    • poke_interval=60 makes the sensor check every 60 seconds to see if the file exists.
    • timeout=60 * 60 * 2 (2 hours) specifies that the sensor should stop waiting after 2 hours if the file has not appeared.
    • mode="poke" keeps the task slot occupied while checking, though mode="reschedule" is an alternative that frees the slot between checks.
  2. Data Processing Task:

    • The data_processing task runs only once the FileSensor detects the file, ensuring that the data processing begins only after the file is available.
  3. Dependencies:

    • The FileSensor task must succeed before data_processing runs, ensuring a file dependency in the workflow.

Benefits

  • Dependency Management: Ensures that tasks depending on file availability only run when the file exists.
  • Flexible Monitoring: Can be configured to check at different intervals and set timeouts for missing files.
  • Efficient Scheduling: With mode="reschedule", the sensor can save resources by not blocking worker slots during the waiting period.
Post Reference: Vikram Aristocratic Elfin Share

No comments:

Post a Comment