About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Thursday, October 31, 2024

SimpleHttpOperator to call the API endpoint in Apache Airflow

Example of a FastAPI application that exposes a simple API endpoint and a corresponding Airflow DAG that uses the SimpleHttpOperator to call the API endpoint.

Step 1: FastAPI Application

First, create a FastAPI application with a simple endpoint. Save this code in a file named main.py.

python

from fastapi import FastAPI from pydantic import BaseModel from typing import Dict app = FastAPI() class Item(BaseModel): name: str description: str = None price: float tax: float = None @app.post("/items/") async def create_item(item: Item) -> Dict[str, float]: total_price = item.price + (item.tax if item.tax else 0) return {"total_price": total_price}

To run the FastAPI app, save this file and run:

bash

uvicorn main:app --host 0.0.0.0 --port 8000

This starts the FastAPI server on http://localhost:8000. The endpoint /items/ accepts POST requests with JSON data containing name, description, price, and tax.

Step 2: Airflow DAG with SimpleHttpOperator

In your Airflow DAG, use SimpleHttpOperator to make a POST request to the FastAPI endpoint. Below is a sample DAG that calls the FastAPI endpoint and logs the response. Save this as call_fastapi_dag.py.


from airflow import DAG from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.utils.dates import days_ago from datetime import timedelta import json default_args = { "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=5), } with DAG( "call_fastapi_dag", default_args=default_args, description="A DAG to call FastAPI endpoint", schedule_interval=timedelta(days=1), start_date=days_ago(1), catchup=False, ) as dag: call_fastapi_task = SimpleHttpOperator( task_id="call_fastapi_task", http_conn_id="fastapi_connection", endpoint="items/", method="POST", data=json.dumps({"name": "Widget", "description": "A widget", "price": 10.99, "tax": 1.99}), headers={"Content-Type": "application/json"}, response_check=lambda response: response.status_code == 200, log_response=True, ) call_fastapi_task

Step 3: Configure the Airflow Connection

  1. Go to the Airflow UI, then to Admin > Connections.
  2. Add a new connection with the following details:
    • Connection Id: fastapi_connection
    • Connection Type: HTTP
    • Host: http://localhost:8000

Explanation

  • FastAPI app: The /items/ endpoint calculates the total price based on price and tax in the request.
  • Airflow DAG: The SimpleHttpOperator sends a POST request to the FastAPI app, with the response being logged in the Airflow task logs.
Post Reference: Vikram Aristocratic Elfin Share

No comments:

Post a Comment