About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Showing posts with label Docker. Show all posts
Showing posts with label Docker. Show all posts

Thursday, October 31, 2024

schedule_interval in Airflow DAGs

In Apache Airflow, the schedule_interval parameter defines how frequently a DAG should run. Here are the main options and formats you can use:

1. Preset Schedule Intervals (String aliases)

  • "@once": Run the DAG only once, regardless of start_date.
  • "@hourly": Run the DAG every hour.
  • "@daily": Run the DAG once a day at midnight (00:00 UTC).
  • "@weekly": Run the DAG once a week at midnight on Sunday (00:00 UTC).
  • "@monthly": Run the DAG once a month at midnight on the first day of the month.
  • "@quarterly": Run the DAG at midnight on the first day of each quarter (January, April, July, October).
  • "@yearly" or "@annually": Run the DAG once a year at midnight on January 1.

2. Cron Expressions (String format)

  • You can use cron syntax to define custom schedules. Format: minute hour day month day_of_week.
    • Examples:
      • "0 9 * * *": Run daily at 9:00 AM UTC.
      • "15 14 * * 1": Run every Monday at 14:15 UTC.
      • "0 0 1 * *": Run at midnight on the first day of each month.

3. TimeDelta (Using datetime.timedelta)

  • Use timedelta for intervals in hours, minutes, days, etc., instead of a specific time of day.
  • Example:
    python
    from datetime import timedelta schedule_interval=timedelta(hours=6) # Runs every 6 hours

4. None

  • Setting schedule_interval=None means the DAG will only run if manually triggered.

Post Reference: Vikram Aristocratic Elfin Share

Python Operator in Airflow

Here’s an example Airflow DAG using PythonOperator to execute three separate Python files, each performing different file and DataFrame operations. Let's assume the three files, named task1.py, task2.py, and task3.py, are located in a directory called bin, and each file contains a function that performs specific data-related tasks.

DAG Code

from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import sys import os # Add bin directory to the system path to import the scripts sys.path.insert(0, os.path.abspath("bin")) # Importing the functions from each Python file from task1 import task1_function from task2 import task2_function from task3 import task3_function # Define the DAG with DAG( dag_id="data_processing_dag", start_date=datetime(2023, 10, 1), schedule_interval="@daily", catchup=False, ) as dag: # Task 1: Call task1.py function to perform file operations task_1 = PythonOperator( task_id="task_1_file_operation", python_callable=task1_function ) # Task 2: Call task2.py function to process DataFrames task_2 = PythonOperator( task_id="task_2_dataframe_operation", python_callable=task2_function ) # Task 3: Call task3.py function for additional data processing task_3 = PythonOperator( task_id="task_3_additional_processing", python_callable=task3_function ) # Define task dependencies task_1 >> task_2 >> task_3

Contents of Each Python File in bin Directory

bin/task1.py


import pandas as pd def task1_function(): # Example file operation: Read and write a CSV file data = pd.DataFrame({"Column1": [1, 2, 3], "Column2": ["A", "B", "C"]}) data.to_csv("/path/to/output/task1_output.csv", index=False) print("Task 1: File operation completed.")

bin/task2.py


import pandas as pd def task2_function(): # Example DataFrame operation: Load and transform data data = pd.read_csv("/path/to/output/task1_output.csv") data["Column1_squared"] = data["Column1"] ** 2 data.to_csv("/path/to/output/task2_output.csv", index=False) print("Task 2: DataFrame operation completed.")

bin/task3.py


import pandas as pd def task3_function(): # Example additional processing: Load data, filter, and save data = pd.read_csv("/path/to/output/task2_output.csv") filtered_data = data[data["Column1_squared"] > 4] filtered_data.to_csv("/path/to/output/task3_output.csv", index=False) print("Task 3: Additional processing completed.")

Explanation

  1. Task 1 reads a CSV file, performs some file operations, and saves the output.
  2. Task 2 loads the CSV file created in Task 1, performs a DataFrame transformation, and saves the updated data.
  3. Task 3 loads the transformed data from Task 2, applies filtering, and saves the final result.

Each task depends on the output of the previous one, making a smooth data processing pipeline. Adjust the file paths as needed.

Post Reference: Vikram Aristocratic Elfin Share

Sunday, October 13, 2024

Running Airflow on your local system using docker-compose

Steps to Perform on Linux Shell

    1. docker --version
    2. docker-compose --version
    3. cd docker_tutorial/
    4. cd airflow-docker/
    5. curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.2/docker-compose.yaml'
    6.     https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
    7. code .

After VS Code is open

Set the Volumes:
  1. Open the terminal 
  2. docker --version
  3. cd sources/
  4. mkdir ./dags ./logs ./config ./plugins
  5. ls
  6. cd ..

Create and Run the Airflow containers
docker-compose up airflow-init
docker-compose 


docker-compose.yaml

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.10.2
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
#                                Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.2}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    # yamllint disable rule:line-length
    # Use simple http server on scheduler for health checks
    # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
    # yamllint enable rule:line-length
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
    # for other purpose (development, test and especially production usage) build/extend Airflow image.
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    # The following line can be used to set a custom config file, stored in the local config folder
    # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file
    # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

  redis:
    # Redis is limited to 7.2-bookworm due to licencing change
    # https://redis.io/blog/redis-adopts-dual-source-available-licensing/
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      # yamllint disable rule:line-length
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  postgres-db-volume:


Pushing the code to repo

create a new repository on the command line
echo "# apache_arflow" >> README.md
git init
git add README.md
git commit -m "first commit"
git branch -M main
git remote add origin https://github.com/tariniteam/apache_arflow.git
git push -u origin main

…or push an existing repository from the command line

git remote add origin https://github.com/tariniteam/apache_arflow.git
git branch -M main
git push -u origin main


Post Reference: Vikram Aristocratic Elfin Share

Friday, September 27, 2024

Docker Post-6 Step-by-step guide to create your first Docker file with a Python script

Docker makes it easy to package and deploy applications. If you're new to Docker, here's a simple guide on how to create your first Docker file, which runs a Python script (`hello.py`). This post will cover setting up the Docker file, creating a basic Python script, and handling dependencies through a `requirements.txt` file.

1. Create the Python script `hello.py`

This script is very simple and prints out a message:

Sunday, September 15, 2024

Post 5: Mounting Multiple Directories to a Single Container Directory Using Docker Run and Symlink



In this post, we will walk through the steps of mounting multiple directories to a single container directory using Docker and symbolic links (symlinks). This can be particularly useful when you need to access the same directory from different paths within your containerized environment.

Step 1: Create the Root Directory
First, you need to create a root directory that will be shared across the symlinks.

For example, let's create a directory named `root_dir1`:

mkdir root_dir1 


Step 2: Create Symlink Directories

Now that we have the root directory, we'll create multiple symlink directories that will all point to the same root directory. Below are instructions for both **Linux Ubuntu** and **Windows PowerShell** users.

For Linux Ubuntu Users:
Use the `ln -s` command to create symbolic links to the root directory:
ln -s /mnt/c/Users/vikik/Projects/PyProjects/root_dir1 sym1
ln -s /mnt/c/Users/vikik/Projects/PyProjects/root_dir1 sym2
ln -s /mnt/c/Users/vikik/Projects/PyProjects/root_dir1 sym3


For Windows PowerShell Users:
Use the `New-Item` command to create symbolic links in PowerShell:

New-Item -ItemType SymbolicLink -Path "sym1" -Target "root_dir1"
New-Item -ItemType SymbolicLink -Path "sym2" -Target "root_dir1"
New-Item -ItemType SymbolicLink -Path "sym3" -Target "root_dir1"

In both cases, `sym1`, `sym2`, and `sym3` will all point to the `root_dir1` directory.

Step 3: Create and Run a Python Docker Container

Now, let's create a Python container and mount the root directory (`root_dir1`) to the container's `/app` path.

Command to Run the Container:

docker run -d --name my-python-container -v C:\Users\vikik\Projects\PyProjects\root_dir1:/app python sleep infinity

This command runs a Python container in detached mode with the root directory mounted to the `/app` path inside the container.

Access the Container:
To access the container, execute the following:
docker exec -it my-python-container /bin/bash

Once inside the container, update the package manager and install the `nano` text editor:

apt update
apt install nano

Now, create a Python script inside the container:
nano test.py

Step 4: Test the Symlink Folders

Any file you create in the root directory (such as `test.py`) will be accessible in all the symlink folders (`sym1`, `sym2`, and `sym3`) since they all point to the same directory.

For example, if you add code to `test.py` in the `/app` directory, it will also appear in `sym1`, `sym2`, and `sym3`.

This setup allows you to mount multiple directories to the same container directory using Docker, while maintaining flexibility with symlinks.

Conclusion

By following these steps, you can successfully create multiple symlink directories that all point to the same root directory inside a Docker container. This approach is helpful when managing multiple access points to a shared directory within a containerized environment.

Post Reference: Vikram Aristocratic Elfin Share