About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Thursday, October 31, 2024

Python Operator in Airflow

Here’s an example Airflow DAG using PythonOperator to execute three separate Python files, each performing different file and DataFrame operations. Let's assume the three files, named task1.py, task2.py, and task3.py, are located in a directory called bin, and each file contains a function that performs specific data-related tasks.

DAG Code

python
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import sys import os # Add bin directory to the system path to import the scripts sys.path.insert(0, os.path.abspath("bin")) # Importing the functions from each Python file from task1 import task1_function from task2 import task2_function from task3 import task3_function # Define the DAG with DAG( dag_id="data_processing_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Task 1: Call task1.py function to perform file operations task_1 = PythonOperator( task_id="task_1_file_operation", python_callable=task1_function ) # Task 2: Call task2.py function to process DataFrames task_2 = PythonOperator( task_id="task_2_dataframe_operation", python_callable=task2_function ) # Task 3: Call task3.py function for additional data processing task_3 = PythonOperator( task_id="task_3_additional_processing", python_callable=task3_function ) # Define task dependencies task_1 >> task_2 >> task_3

Contents of Each Python File in bin Directory

bin/task1.py

python
import pandas as pd def task1_function(): # Example file operation: Read and write a CSV file data = pd.DataFrame({"Column1": [1, 2, 3], "Column2": ["A", "B", "C"]}) data.to_csv("/path/to/output/task1_output.csv", index=False) print("Task 1: File operation completed.")

bin/task2.py

python
import pandas as pd def task2_function(): # Example DataFrame operation: Load and transform data data = pd.read_csv("/path/to/output/task1_output.csv") data["Column1_squared"] = data["Column1"] ** 2 data.to_csv("/path/to/output/task2_output.csv", index=False) print("Task 2: DataFrame operation completed.")

bin/task3.py

python
import pandas as pd def task3_function(): # Example additional processing: Load data, filter, and save data = pd.read_csv("/path/to/output/task2_output.csv") filtered_data = data[data["Column1_squared"] > 4] filtered_data.to_csv("/path/to/output/task3_output.csv", index=False) print("Task 3: Additional processing completed.")

Explanation

  1. Task 1 reads a CSV file, performs some file operations, and saves the output.
  2. Task 2 loads the CSV file created in Task 1, performs a DataFrame transformation, and saves the updated data.
  3. Task 3 loads the transformed data from Task 2, applies filtering, and saves the final result.

Each task depends on the output of the previous one, making a smooth data processing pipeline. Adjust the file paths as needed.

Post Reference: Vikram Aristocratic Elfin Share

No comments:

Post a Comment