About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Thursday, October 31, 2024

SimpleHttpOperator to call the API endpoint in Apache Airflow

Example of a FastAPI application that exposes a simple API endpoint and a corresponding Airflow DAG that uses the SimpleHttpOperator to call the API endpoint.

Step 1: FastAPI Application

First, create a FastAPI application with a simple endpoint. Save this code in a file named main.py.

python

from fastapi import FastAPI from pydantic import BaseModel from typing import Dict app = FastAPI() class Item(BaseModel): name: str description: str = None price: float tax: float = None @app.post("/items/") async def create_item(item: Item) -> Dict[str, float]: total_price = item.price + (item.tax if item.tax else 0) return {"total_price": total_price}

To run the FastAPI app, save this file and run:

bash

uvicorn main:app --host 0.0.0.0 --port 8000

This starts the FastAPI server on http://localhost:8000. The endpoint /items/ accepts POST requests with JSON data containing name, description, price, and tax.

Step 2: Airflow DAG with SimpleHttpOperator

In your Airflow DAG, use SimpleHttpOperator to make a POST request to the FastAPI endpoint. Below is a sample DAG that calls the FastAPI endpoint and logs the response. Save this as call_fastapi_dag.py.


from airflow import DAG from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.utils.dates import days_ago from datetime import timedelta import json default_args = { "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=5), } with DAG( "call_fastapi_dag", default_args=default_args, description="A DAG to call FastAPI endpoint", schedule_interval=timedelta(days=1), start_date=days_ago(1), catchup=False, ) as dag: call_fastapi_task = SimpleHttpOperator( task_id="call_fastapi_task", http_conn_id="fastapi_connection", endpoint="items/", method="POST", data=json.dumps({"name": "Widget", "description": "A widget", "price": 10.99, "tax": 1.99}), headers={"Content-Type": "application/json"}, response_check=lambda response: response.status_code == 200, log_response=True, ) call_fastapi_task

Step 3: Configure the Airflow Connection

  1. Go to the Airflow UI, then to Admin > Connections.
  2. Add a new connection with the following details:
    • Connection Id: fastapi_connection
    • Connection Type: HTTP
    • Host: http://localhost:8000

Explanation

  • FastAPI app: The /items/ endpoint calculates the total price based on price and tax in the request.
  • Airflow DAG: The SimpleHttpOperator sends a POST request to the FastAPI app, with the response being logged in the Airflow task logs.
Post Reference: Vikram Aristocratic Elfin Share

EmailOperator in Apache Airflow

The EmailOperator in Apache Airflow is used to send emails as part of your workflow. When using Gmail as the email service, you'll need to configure SMTP settings properly to ensure successful email delivery. Below are the steps to set up and use the EmailOperator with Gmail in Airflow.

Step 1: Install Required Libraries

Make sure you have the required libraries installed. You may need to install the apache-airflow-providers-smtp provider if it's not already installed:

bash
pip install apache-airflow-providers-smtp

Step 2: Configure Airflow to Use Gmail

You need to configure your Airflow instance to send emails using Gmail's SMTP server. This can be done in the airflow.cfg configuration file or by setting environment variables.

Option A: Using airflow.cfg

  1. Locate the airflow.cfg file in your Airflow home directory (usually ~/airflow/).
  2. Update the [email] section with your Gmail SMTP settings:
ini
[email] email_backend = airflow.utils.email.send_email_smtp [smtp] smtp_host = smtp.gmail.com smtp_starttls = True smtp_ssl = False smtp_user = your_email@gmail.com # Your Gmail address smtp_password = your_app_password # Your Gmail app password smtp_port = 587 smtp_mail_from = your_email@gmail.com # Your Gmail address

Option B: Using Environment Variables

Alternatively, you can set these values using environment variables:

bash
export AIRFLOW__EMAIL__EMAIL_BACKEND=airflow.utils.email.send_email_smtp export AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com export AIRFLOW__SMTP__SMTP_STARTTLS=True export AIRFLOW__SMTP__SMTP_SSL=False export AIRFLOW__SMTP__SMTP_USER=your_email@gmail.com export AIRFLOW__SMTP__SMTP_PASSWORD=your_app_password export AIRFLOW__SMTP__SMTP_PORT=587 export AIRFLOW__SMTP__SMTP_MAIL_FROM=your_email@gmail.com

Step 3: Create an App Password (Optional)

If you have two-factor authentication (2FA) enabled on your Google account, you will need to create an App Password to use with Airflow. Follow these steps:

  1. Go to your Google Account settings.
  2. Navigate to Security > Signing in to Google > App passwords.
  3. Create a new App Password for "Mail" and "Other" (you can name it "Airflow").
  4. Use this App Password in the smtp_password configuration.

Step 4: Use EmailOperator in a DAG

Now that you've configured your SMTP settings, you can use the EmailOperator in your Airflow DAG.

Example DAG

Here's a simple example of how to use the EmailOperator:

python
from airflow import DAG from airflow.operators.email_operator import EmailOperator from airflow.operators.dummy_operator import DummyOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('email_example_dag', default_args=default_args, schedule_interval='@daily') as dag: start_task = DummyOperator(task_id='start') send_email = EmailOperator( task_id='send_email', to='recipient_email@example.com', # Recipient's email address subject='Airflow Email Test', html_content='<h3>This is a test email sent from Airflow using Gmail!</h3>', ) end_task = DummyOperator(task_id='end') start_task >> send_email >> end_task

Explanation

  1. DAG Definition: The DAG is named email_example_dag and is scheduled to run daily.
  2. Tasks:
    • start_task: A dummy task representing the start of the DAG.
    • send_email: The EmailOperator sends an email to the specified recipient. You can customize the subject and the HTML content of the email.
    • end_task: Another dummy task representing the end of the DAG.

Notes

  • HTML Content: The html_content parameter allows you to send HTML formatted emails. You can also use the text_content parameter for plain text emails.
  • Multiple Recipients: You can send emails to multiple recipients by providing a comma-separated string in the to parameter.
  • Testing: Make sure to test the email sending functionality by running your DAG manually and checking if the email is received.

This setup allows you to send emails from your Airflow workflows using Gmail effectively.

Post Reference: Vikram Aristocratic Elfin Share

TriggerDagRunOperator in Apache Airflow

The TriggerDagRunOperator in Apache Airflow is used to programmatically trigger another DAG run from within a DAG. This operator is useful for creating dynamic workflows where the execution of one DAG may depend on the successful completion of another DAG or when you want to kick off a separate workflow based on specific conditions.

How to Use TriggerDagRunOperator

You can use the TriggerDagRunOperator by specifying the dag_id of the target DAG you want to trigger. You can also pass parameters to the triggered DAG if needed.

Example

Here's an example of how to use TriggerDagRunOperator in Airflow:

Main DAG: main_dag.py

python
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from datetime import datetime # Define the main DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('main_dag', default_args=default_args, schedule_interval='@daily') as main_dag: start = DummyOperator(task_id='start') # Trigger another DAG named 'secondary_dag' trigger = TriggerDagRunOperator( task_id='trigger_secondary_dag', trigger_dag_id='secondary_dag', # The ID of the DAG to trigger wait_for_completion=True, # Wait for the triggered DAG to complete conf={"key": "value"}, # Optional parameters to pass to the triggered DAG ) end = DummyOperator(task_id='end') start >> trigger >> end

Triggered DAG: secondary_dag.py

python
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime # Define the secondary DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('secondary_dag', default_args=default_args, schedule_interval='@daily') as secondary_dag: start_secondary = DummyOperator(task_id='start_secondary') # You can access the `conf` parameter here if needed # For example, if you want to print the passed config from airflow.operators.python_operator import PythonOperator def print_config(**kwargs): conf = kwargs['dag_run'].conf print(f"Received config: {conf}") print_conf = PythonOperator( task_id='print_config', python_callable=print_config, provide_context=True, ) end_secondary = DummyOperator(task_id='end_secondary') start_secondary >> print_conf >> end_secondary

Explanation

  1. Main DAG (main_dag):

    • Contains a DummyOperator task named start to represent the beginning of the DAG.
    • Uses the TriggerDagRunOperator to trigger another DAG named secondary_dag.
    • The wait_for_completion parameter is set to True, meaning the main DAG will wait for the triggered DAG to finish before moving on to the next task (end).
    • Optionally, you can pass a configuration dictionary using the conf parameter, which can be accessed in the triggered DAG.
  2. Triggered DAG (secondary_dag):

    • Contains a DummyOperator to start and end the DAG.
    • A PythonOperator named print_config is used to demonstrate how to access the configuration passed from the main DAG. The configuration can be accessed using kwargs['dag_run'].conf.

Notes

  • Cross-DAG Dependencies: The TriggerDagRunOperator is particularly useful for establishing dependencies across different DAGs in a workflow.
  • Configuration: You can pass any parameters in the conf dictionary that the triggered DAG can read and use as needed.
  • Error Handling: Ensure to handle potential errors in the triggered DAG to avoid failures that could affect the main DAG execution.

This setup allows you to create a dynamic and interconnected workflow in Airflow by triggering one DAG from another.

Post Reference: Vikram Aristocratic Elfin Share

SubDagOperator in Apache Airflow

The SubDagOperator in Apache Airflow allows you to create a DAG within another DAG, enabling you to manage and execute a group of tasks as a single unit. This can help organize complex workflows by breaking them down into smaller, reusable components.

How to Use SubDagOperator

To use the SubDagOperator, you need to define a sub-DAG function that specifies the tasks to be executed in the sub-DAG. Then, you can instantiate a SubDagOperator in your main DAG.

Example

Here's a basic example demonstrating how to use SubDagOperator in Airflow:

Main DAG: main_dag.py

python
from airflow import DAG from airflow.operators.dagrun_operator import DagRunOrder from airflow.operators.dummy_operator import DummyOperator from airflow.operators.subdag_operator import SubDagOperator from datetime import datetime # Define the sub-DAG function def create_subdag(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id=f"{parent_dag_name}.{child_dag_name}", default_args=args, schedule_interval='@daily', ) with dag_subdag: start = DummyOperator(task_id='start') process = DummyOperator(task_id='process') end = DummyOperator(task_id='end') start >> process >> end return dag_subdag # Define the main DAG default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('main_dag', default_args=default_args, schedule_interval='@daily') as main_dag: start_main = DummyOperator(task_id='start_main') # Create a SubDagOperator subdag_task = SubDagOperator( task_id='subdag_task', subdag=create_subdag('main_dag', 'subdag', default_args), dag=main_dag, ) end_main = DummyOperator(task_id='end_main') start_main >> subdag_task >> end_main

Explanation

  1. Sub-DAG Function:

    • The create_subdag function defines a sub-DAG. It takes the parent_dag_name, child_dag_name, and args as parameters.
    • Inside this function, we define the tasks for the sub-DAG (start, process, end) using DummyOperator.
  2. Main DAG:

    • The main DAG is defined with a DAG context.
    • A DummyOperator named start_main represents the start of the main DAG.
    • The SubDagOperator is instantiated, calling the create_subdag function to create the sub-DAG. The sub-DAG will execute the tasks defined in the create_subdag function.
    • A DummyOperator named end_main represents the end of the main DAG.
  3. Task Dependencies:

    • The main DAG starts with start_main, followed by the execution of the sub-DAG (subdag_task), and finally, it ends with end_main.

Notes

  • Scheduling: The sub-DAG will inherit the scheduling from the parent DAG, so you don’t need to specify the schedule_interval in the sub-DAG.
  • Complexity: Use SubDagOperator judiciously, as too many nested DAGs can make the workflow harder to manage and visualize.
  • Version: SubDagOperator is not recommended for high-concurrency scenarios because the sub-DAG tasks run in the same scheduler slot as the parent DAG.

This setup allows you to encapsulate and manage complex workflows effectively within a main DAG using the SubDagOperator.

Post Reference: Vikram Aristocratic Elfin Share

ExternalTaskSensor in Apache Airflow

The ExternalTaskSensor in Apache Airflow is used to wait for a task in a different DAG (Directed Acyclic Graph) to complete before proceeding. This operator is useful when you have dependencies between tasks across multiple DAGs.

How to Use ExternalTaskSensor

To use the ExternalTaskSensor, you need to specify the task_id of the task you want to wait for and the dag_id of the DAG that contains that task. You can also define the execution date for which the task should be checked.

Example

Here's a basic example of how to use ExternalTaskSensor in an Airflow DAG:

Setup

  1. DAG 1: The first DAG, which contains the task that will be waited on.
  2. DAG 2: The second DAG, which includes the ExternalTaskSensor.

DAG 1: dag_one.py

python
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('dag_one', default_args=default_args, schedule_interval='@daily') as dag_one: start_task = DummyOperator(task_id='start_task') end_task = DummyOperator(task_id='end_task') start_task >> end_task

DAG 2: dag_two.py

python
from airflow import DAG from airflow.sensors.external_task_sensor import ExternalTaskSensor from airflow.operators.dummy_operator import DummyOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 10, 1), } with DAG('dag_two', default_args=default_args, schedule_interval='@daily') as dag_two: wait_for_dag_one = ExternalTaskSensor( task_id='wait_for_dag_one', external_dag_id='dag_one', external_task_id='end_task', # This is the task in dag_one to wait for mode='poke', # or 'reschedule' timeout=600, # Timeout after 10 minutes poke_interval=30, # Check every 30 seconds ) next_task = DummyOperator(task_id='next_task') wait_for_dag_one >> next_task

Explanation

  1. DAG 1 (dag_one):

    • Contains two dummy tasks: start_task and end_task.
    • The end_task is the one we will wait for in dag_two.
  2. DAG 2 (dag_two):

    • Uses the ExternalTaskSensor to wait for the end_task in dag_one.
    • The mode can be set to poke (checks periodically) or reschedule (reschedules the task when it’s not yet done).
    • You can set timeout to specify how long to wait for the external task to complete and poke_interval for how often to check.

Notes

  • Ensure both DAGs are in the same Airflow instance.
  • The start_date in both DAGs should be set appropriately for the ExternalTaskSensor to work as expected.
  • The external_task_id must match the task_id in the external DAG.

This setup allows dag_two to execute next_task only after end_task in dag_one has completed successfully.

Post Reference: Vikram Aristocratic Elfin Share

FileSensor operator in Apache Airflow

The FileSensor operator in Apache Airflow is used to monitor the existence of a specific file in a directory. It pauses the execution of a task until the specified file is detected. This is especially useful for workflows that depend on data files being available before processing begins.

Example: Using FileSensor to Wait for a File

Suppose we have a DAG that processes data once a CSV file is available in a specific directory. We’ll use FileSensor to check if the file exists before running any data processing.

Example DAG with FileSensor

python
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.sensors.filesystem import FileSensor from datetime import datetime # Define file path to monitor file_path = "/path/to/your/file/data.csv" # Define the DAG with DAG( dag_id="file_sensor_example_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Define the FileSensor to check for the file's existence wait_for_file = FileSensor( task_id="wait_for_file", filepath=file_path, # Path of the file to wait for fs_conn_id="fs_default", # File system connection ID poke_interval=60, # Check every 60 seconds timeout=60 * 60 * 2, # Timeout after 2 hours if the file isn't found mode="poke" # Use "poke" mode for frequent checking ) # Define a data processing task def process_data(): print("Data file found. Processing data...") data_processing = PythonOperator( task_id="data_processing", python_callable=process_data ) # Set task dependencies wait_for_file >> data_processing

Explanation of the DAG

  1. FileSensor:

    • The wait_for_file task uses FileSensor to wait until the file specified by filepath (data.csv) exists.
    • fs_conn_id="fs_default" is the connection ID for the file system, set in Airflow's Connections UI. This can be configured to point to different file systems or directories as needed.
    • poke_interval=60 makes the sensor check every 60 seconds to see if the file exists.
    • timeout=60 * 60 * 2 (2 hours) specifies that the sensor should stop waiting after 2 hours if the file has not appeared.
    • mode="poke" keeps the task slot occupied while checking, though mode="reschedule" is an alternative that frees the slot between checks.
  2. Data Processing Task:

    • The data_processing task runs only once the FileSensor detects the file, ensuring that the data processing begins only after the file is available.
  3. Dependencies:

    • The FileSensor task must succeed before data_processing runs, ensuring a file dependency in the workflow.

Benefits

  • Dependency Management: Ensures that tasks depending on file availability only run when the file exists.
  • Flexible Monitoring: Can be configured to check at different intervals and set timeouts for missing files.
  • Efficient Scheduling: With mode="reschedule", the sensor can save resources by not blocking worker slots during the waiting period.
Post Reference: Vikram Aristocratic Elfin Share

TimeSensor operator in Apache Airflow

The TimeSensor operator in Apache Airflow is used to pause a DAG's execution until a specified time of day. This is useful when you want a task to run only after a certain time, regardless of when the DAG starts. Once the specified time is reached, the TimeSensor allows the DAG to proceed to the next tasks.

Example: Using TimeSensor to Wait Until a Specific Time

Let’s say we want a DAG to start with a task that waits until 2:00 PM each day before running the actual data processing tasks. We’ll use TimeSensor to accomplish this.

python
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.sensors.time_sensor import TimeSensor from datetime import datetime, time # Define the DAG with DAG( dag_id="time_sensor_example_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Define the TimeSensor to wait until 2:00 PM wait_until_2pm = TimeSensor( task_id="wait_until_2pm", target_time=time(14, 0), # 2:00 PM in 24-hour format poke_interval=60 * 5, # Check every 5 minutes mode="reschedule" # Use 'reschedule' mode to free up resources while waiting ) # Define a data processing task def process_data(): print("Processing data after 2:00 PM") data_processing = PythonOperator( task_id="data_processing", python_callable=process_data ) # Set task dependencies wait_until_2pm >> data_processing

Explanation of the DAG

  1. TimeSensor:

    • The wait_until_2pm task is a TimeSensor that pauses the DAG until it’s 2:00 PM (14:00).
    • The target_time argument specifies the time to wait until. In this example, it’s set to time(14, 0), which is 2:00 PM.
    • The poke_interval is set to 5 minutes (300 seconds), meaning the sensor will check every 5 minutes if the current time has reached 2:00 PM.
    • The mode is set to "reschedule", which frees up worker slots while waiting, allowing other tasks to run.
  2. Data Processing Task:

    • The data_processing task runs only after the TimeSensor has allowed the DAG to proceed, ensuring it starts after 2:00 PM.
  3. Dependencies:

    • The DAG only proceeds to the data_processing task once the TimeSensor condition (2:00 PM) is met.

Benefits

  • Controlled Timing: TimeSensor ensures tasks don’t run until a specific time.
  • Efficient Resource Management: Using "reschedule" mode helps free up resources while waiting, reducing the load on the Airflow scheduler.
Post Reference: Vikram Aristocratic Elfin Share

DummyOperator with BranchPythonOperator

Example using DummyOperator with BranchPythonOperator in a DAG. In this scenario, the DAG decides between two branches based on a condition, with DummyOperator marking the end of each branch.

Example: Conditional Workflow with Branching and Dummy Operators

Let’s say we want the DAG to check if a number is even or odd. Based on this condition, it will follow one of two paths, with each path ending in a DummyOperator.

python
from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.dummy import DummyOperator from datetime import datetime import random # Define the DAG with DAG( dag_id="branching_with_dummy_operator_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Branching function to decide path based on even or odd check def decide_even_or_odd(): random_number = random.randint(1, 100) print(f"Generated random number: {random_number}") if random_number % 2 == 0: return "even_path" else: return "odd_path" # BranchPythonOperator to choose the path branching = BranchPythonOperator( task_id="branching", python_callable=decide_even_or_odd ) # Define the tasks for each path def process_even(): print("Processing even number...") def process_odd(): print("Processing odd number...") even_task = PythonOperator( task_id="even_task", python_callable=process_even ) odd_task = PythonOperator( task_id="odd_task", python_callable=process_odd ) # Dummy operators to mark the end of each branch even_end = DummyOperator(task_id="even_end") odd_end = DummyOperator(task_id="odd_end") # Define dependencies branching >> [even_task, odd_task] even_task >> even_end odd_task >> odd_end

Explanation of the DAG

  1. Branching Logic:

    • The decide_even_or_odd function generates a random number between 1 and 100.
    • If the number is even, it returns "even_path", directing the workflow to the even_task.
    • If the number is odd, it returns "odd_path", directing the workflow to the odd_task.
  2. BranchPythonOperator:

    • The branching task uses decide_even_or_odd to choose between the two paths, even_task and odd_task.
  3. Dummy Operators as End Points:

    • Each branch ends with a DummyOperator, even_end or odd_end, serving as placeholders to mark the end of each path.
  4. Skipped Tasks:

    • The branch not selected (either even_task or odd_task) will be marked as skipped, along with its corresponding DummyOperator end marker (even_end or odd_end).

Benefits

Using DummyOperator in combination with BranchPythonOperator clearly defines the end of each conditional path, making the workflow easier to manage and read. It also allows easy extension if you want to add more steps after each branch later.

Post Reference: Vikram Aristocratic Elfin Share

DummyOperator in Apache Airflow

The DummyOperator in Apache Airflow is a simple operator that performs no action. It’s often used as a placeholder in DAGs to structure or organize tasks without running any logic. It’s helpful for marking the start or end of a DAG, grouping tasks, or adding checkpoints in complex workflows.

Common Use Cases for DummyOperator

  1. Start or End Markers: Define the beginning or end of a DAG.
  2. Logical Grouping: Group or branch tasks for easier readability.
  3. Conditional Paths: Used with BranchPythonOperator to create conditional paths without running a task.

Example DAG with DummyOperator

Here’s an example DAG that uses DummyOperator to mark the start and end of the workflow. This DAG performs some data processing steps with clear task separation using DummyOperator.

python
from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator from datetime import datetime # Define the DAG with DAG( dag_id="dummy_operator_example_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Define the start and end dummy operators start = DummyOperator(task_id="start") end = DummyOperator(task_id="end") # Define some Python tasks def extract_data(): print("Extracting data...") def process_data(): print("Processing data...") def load_data(): print("Loading data...") extract_task = PythonOperator( task_id="extract_data", python_callable=extract_data ) process_task = PythonOperator( task_id="process_data", python_callable=process_data ) load_task = PythonOperator( task_id="load_data", python_callable=load_data ) # Set up dependencies with DummyOperator start >> extract_task >> process_task >> load_task >> end

Explanation of the DAG

  1. Start and End Tasks: The start and end DummyOperators mark the boundaries of the workflow, improving readability and making it easier to adjust dependencies.
  2. Data Processing Tasks: The DAG includes three tasks: extract_data, process_data, and load_data.
  3. Task Dependencies:
    • The tasks are chained to execute in sequence, starting with start, followed by each data processing task, and ending with end.

Benefits

Using DummyOperator here makes the workflow cleaner and more organized by clearly marking the beginning and end, allowing for easier DAG maintenance and a logical flow.

Post Reference: Vikram Aristocratic Elfin Share

BranchPythonOperator in Apache Airflow

The BranchPythonOperator in Apache Airflow allows you to direct your workflow down different paths based on conditions you set in a Python function. It’s especially useful when you need to implement conditional logic within your DAG to decide which tasks to execute next.

How BranchPythonOperator Works

  1. Define a Branching Function: The function you define for BranchPythonOperator should return the task_id of the next task(s) you want to execute.
  2. Skipping Unselected Branches: Only the branch specified by the function will execute, while other branches are automatically marked as skipped.

Branching Based on Weekday or Weekend

This example will demonstrate how to use BranchPythonOperator to direct the workflow down different paths based on whether the current day is a weekday or a weekend.

python
from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from datetime import datetime import pendulum # Define the DAG with DAG( dag_id="weekday_weekend_branching_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Define branching function def weekday_or_weekend(): current_day = pendulum.now().day_of_week # Monday is 0 and Sunday is 6 if current_day < 5: # Weekdays are 0-4 return "weekday_task" else: # Weekend is 5-6 return "weekend_task" # BranchPythonOperator to determine the branch branching = BranchPythonOperator( task_id="branching", python_callable=weekday_or_weekend, provide_context=True ) # Define tasks for weekday and weekend paths def weekday_task_func(): print("Today is a weekday. Running weekday task.") def weekend_task_func(): print("Today is a weekend. Running weekend task.") # Task for weekday path weekday_task = PythonOperator( task_id="weekday_task", python_callable=weekday_task_func ) # Task for weekend path weekend_task = PythonOperator( task_id="weekend_task", python_callable=weekend_task_func ) # Define dependencies branching >> [weekday_task, weekend_task]

Explanation

  1. Branching Logic:

    • The function weekday_or_weekend() checks the current day of the week using pendulum.now().day_of_week.
    • It returns "weekday_task" if it’s a weekday, otherwise "weekend_task".
  2. BranchPythonOperator:

    • The branching task uses the weekday_or_weekend function to decide the path.
    • It executes either the weekday_task or weekend_task based on the returned value.
  3. Skipping Unchosen Tasks:

    • Only the task with the matching task_id will run, while the other task will be marked as skipped.

Output Example

If this DAG is executed on a Monday:

  • Output: "Today is a weekday. Running weekday task."
  • Skipped: The weekend_task
Post Reference: Vikram Aristocratic Elfin Share

schedule_interval in Airflow DAGs

In Apache Airflow, the schedule_interval parameter defines how frequently a DAG should run. Here are the main options and formats you can use:

1. Preset Schedule Intervals (String aliases)

  • "@once": Run the DAG only once, regardless of start_date.
  • "@hourly": Run the DAG every hour.
  • "@daily": Run the DAG once a day at midnight (00:00 UTC).
  • "@weekly": Run the DAG once a week at midnight on Sunday (00:00 UTC).
  • "@monthly": Run the DAG once a month at midnight on the first day of the month.
  • "@quarterly": Run the DAG at midnight on the first day of each quarter (January, April, July, October).
  • "@yearly" or "@annually": Run the DAG once a year at midnight on January 1.

2. Cron Expressions (String format)

  • You can use cron syntax to define custom schedules. Format: minute hour day month day_of_week.
    • Examples:
      • "0 9 * * *": Run daily at 9:00 AM UTC.
      • "15 14 * * 1": Run every Monday at 14:15 UTC.
      • "0 0 1 * *": Run at midnight on the first day of each month.

3. TimeDelta (Using datetime.timedelta)

  • Use timedelta for intervals in hours, minutes, days, etc., instead of a specific time of day.
  • Example:
    python
    from datetime import timedelta schedule_interval=timedelta(hours=6) # Runs every 6 hours

4. None

  • Setting schedule_interval=None means the DAG will only run if manually triggered.

Post Reference: Vikram Aristocratic Elfin Share

Python Operator in Airflow

Here’s an example Airflow DAG using PythonOperator to execute three separate Python files, each performing different file and DataFrame operations. Let's assume the three files, named task1.py, task2.py, and task3.py, are located in a directory called bin, and each file contains a function that performs specific data-related tasks.

DAG Code

python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import sys import os # Add bin directory to the system path to import the scripts sys.path.insert(0, os.path.abspath("bin")) # Importing the functions from each Python file from task1 import task1_function from task2 import task2_function from task3 import task3_function # Define the DAG with DAG( dag_id="data_processing_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Task 1: Call task1.py function to perform file operations task_1 = PythonOperator( task_id="task_1_file_operation", python_callable=task1_function ) # Task 2: Call task2.py function to process DataFrames task_2 = PythonOperator( task_id="task_2_dataframe_operation", python_callable=task2_function ) # Task 3: Call task3.py function for additional data processing task_3 = PythonOperator( task_id="task_3_additional_processing", python_callable=task3_function ) # Define task dependencies task_1 >> task_2 >> task_3

Contents of Each Python File in bin Directory

bin/task1.py

python
import pandas as pd def task1_function(): # Example file operation: Read and write a CSV file data = pd.DataFrame({"Column1": [1, 2, 3], "Column2": ["A", "B", "C"]}) data.to_csv("/path/to/output/task1_output.csv", index=False) print("Task 1: File operation completed.")

bin/task2.py

python
import pandas as pd def task2_function(): # Example DataFrame operation: Load and transform data data = pd.read_csv("/path/to/output/task1_output.csv") data["Column1_squared"] = data["Column1"] ** 2 data.to_csv("/path/to/output/task2_output.csv", index=False) print("Task 2: DataFrame operation completed.")

bin/task3.py

python
import pandas as pd def task3_function(): # Example additional processing: Load data, filter, and save data = pd.read_csv("/path/to/output/task2_output.csv") filtered_data = data[data["Column1_squared"] > 4] filtered_data.to_csv("/path/to/output/task3_output.csv", index=False) print("Task 3: Additional processing completed.")

Explanation

  1. Task 1 reads a CSV file, performs some file operations, and saves the output.
  2. Task 2 loads the CSV file created in Task 1, performs a DataFrame transformation, and saves the updated data.
  3. Task 3 loads the transformed data from Task 2, applies filtering, and saves the final result.

Each task depends on the output of the previous one, making a smooth data processing pipeline. Adjust the file paths as needed.

Post Reference: Vikram Aristocratic Elfin Share

Sunday, October 13, 2024

Running Airflow on your local system using docker-compose

Steps to Perform on Linux Shell

    1. docker --version
    2. docker-compose --version
    3. cd docker_tutorial/
    4. cd airflow-docker/
    5. curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.2/docker-compose.yaml'
    6.     https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
    7. code .

After VS Code is open

Set the Volumes:
  1. Open the terminal 
  2. docker --version
  3. cd sources/
  4. mkdir ./dags ./logs ./config ./plugins
  5. ls
  6. cd ..

Create and Run the Airflow containers
docker-compose up airflow-init
docker-compose 


docker-compose.yaml

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.10.2
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
#                                Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.2}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    # yamllint disable rule:line-length
    # Use simple http server on scheduler for health checks
    # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
    # yamllint enable rule:line-length
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
    # for other purpose (development, test and especially production usage) build/extend Airflow image.
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    # The following line can be used to set a custom config file, stored in the local config folder
    # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file
    # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

  redis:
    # Redis is limited to 7.2-bookworm due to licencing change
    # https://redis.io/blog/redis-adopts-dual-source-available-licensing/
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      # yamllint disable rule:line-length
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  postgres-db-volume:


Pushing the code to repo

create a new repository on the command line
echo "# apache_arflow" >> README.md
git init
git add README.md
git commit -m "first commit"
git branch -M main
git remote add origin https://github.com/tariniteam/apache_arflow.git
git push -u origin main

…or push an existing repository from the command line

git remote add origin https://github.com/tariniteam/apache_arflow.git
git branch -M main
git push -u origin main


Post Reference: Vikram Aristocratic Elfin Share