Apache Airflow Data Pipeline
Apache Airflow with Celery executors, PostgreSQL, Redis, and Flower monitoring
Overview
Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows, originally developed by Airbnb in 2014 to solve complex data pipeline orchestration challenges. It allows data engineers to define workflows as code using Python DAGs (Directed Acyclic Graphs), making it the go-to solution for ETL processes, machine learning pipelines, and task automation across thousands of organizations worldwide. The Celery executor enables horizontal scaling by distributing task execution across multiple worker nodes, while PostgreSQL serves as the robust metadata database storing DAG runs, task instances, and connection details. Redis acts as the message broker for Celery, queuing tasks and enabling real-time communication between the scheduler and workers, while Flower provides a web-based monitoring interface for tracking worker performance and task distribution. This production-grade stack solves the fundamental challenge of reliable, scalable workflow orchestration by combining Airflow's rich scheduling capabilities with Celery's distributed task execution, PostgreSQL's ACID compliance for critical metadata, and Redis's sub-millisecond message passing. Data engineering teams, DevOps professionals, and organizations running complex ETL pipelines will benefit from this configuration's ability to handle thousands of concurrent tasks while maintaining visibility into workflow execution, task dependencies, and system health through comprehensive monitoring and alerting capabilities.
Key Features
- Python-based DAG definition with rich operator library for databases, cloud services, and APIs
- CeleryExecutor for horizontal scaling with dynamic worker allocation across multiple nodes
- PostgreSQL metadata database with ACID compliance for reliable DAG run and task state tracking
- Redis message broker enabling sub-millisecond task queuing and worker communication
- Flower monitoring dashboard for real-time Celery worker metrics and task distribution analytics
- Triggerer service for efficient deferrable operator handling without blocking worker slots
- Web UI with Gantt charts, task logs, and DAG visualization for comprehensive workflow monitoring
- Connection and variable management with encryption for secure credential storage
Common Use Cases
- 1ETL pipelines processing data from multiple sources into data warehouses with complex dependencies
- 2Machine learning workflows orchestrating model training, validation, and deployment across environments
- 3Data quality monitoring with automated checks, alerts, and remediation workflows
- 4Multi-cloud data synchronization between AWS S3, Google Cloud Storage, and Azure Blob Storage
- 5Financial reporting automation with regulatory compliance checks and audit trails
- 6IoT data processing pipelines handling sensor data ingestion, transformation, and analytics
- 7Marketing campaign automation with customer segmentation, email triggers, and performance tracking
Prerequisites
- Docker Engine 20.10+ and Docker Compose 2.0+ with at least 6GB available RAM for all services
- Python knowledge for writing DAGs and understanding Airflow's operator patterns
- Basic PostgreSQL administration skills for database maintenance and query optimization
- Understanding of Celery distributed task concepts and worker scaling strategies
- Network ports 8080 (Airflow UI) and 5555 (Flower) available and not blocked by firewalls
- Environment variables configured: POSTGRES_PASSWORD, FERNET_KEY, and ADMIN_PASSWORD
For development & testing. Review security settings, change default credentials, and test thoroughly before production use. See Terms
docker-compose.yml
docker-compose.yml
1x-airflow-common: &airflow-common2 image: apache/airflow:2.8.03 environment: &airflow-common-env4 AIRFLOW__CORE__EXECUTOR: CeleryExecutor5 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow6 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow7 AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/08 AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}9 AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'10 AIRFLOW__CORE__LOAD_EXAMPLES: 'false'11 AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'12 volumes: 13 - ./dags:/opt/airflow/dags14 - ./logs:/opt/airflow/logs15 - ./plugins:/opt/airflow/plugins16 depends_on: 17 - redis18 - postgres1920services: 21 postgres: 22 image: postgres:15-alpine23 container_name: airflow-postgres24 restart: unless-stopped25 environment: 26 - POSTGRES_USER=airflow27 - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}28 - POSTGRES_DB=airflow29 volumes: 30 - postgres_data:/var/lib/postgresql/data3132 redis: 33 image: redis:7-alpine34 container_name: airflow-redis35 restart: unless-stopped3637 airflow-webserver: 38 <<: *airflow-common39 container_name: airflow-webserver40 command: webserver41 ports: 42 - "${AIRFLOW_PORT:-8080}:8080"43 restart: unless-stopped4445 airflow-scheduler: 46 <<: *airflow-common47 container_name: airflow-scheduler48 command: scheduler49 restart: unless-stopped5051 airflow-worker: 52 <<: *airflow-common53 container_name: airflow-worker54 command: celery worker55 restart: unless-stopped5657 airflow-triggerer: 58 <<: *airflow-common59 container_name: airflow-triggerer60 command: triggerer61 restart: unless-stopped6263 airflow-init: 64 <<: *airflow-common65 container_name: airflow-init66 entrypoint: /bin/bash67 command: 68 - -c69 - |70 airflow db init71 airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com --password ${ADMIN_PASSWORD}72 profiles: 73 - init7475 flower: 76 <<: *airflow-common77 container_name: airflow-flower78 command: celery flower79 ports: 80 - "${FLOWER_PORT:-5555}:5555"81 restart: unless-stopped8283volumes: 84 postgres_data: .env Template
.env
1# Apache Airflow Stack2AIRFLOW_PORT=80803FLOWER_PORT=555545# Database6POSTGRES_PASSWORD=airflow_password78# Fernet key (generate with: python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")9FERNET_KEY=your-fernet-key-here1011# Admin user12ADMIN_PASSWORD=adminUsage Notes
- 1Initialize first: docker compose --profile init up airflow-init
- 2Airflow UI at http://localhost:8080 (admin/admin)
- 3Flower (worker monitoring) at http://localhost:5555
- 4Place DAG files in ./dags directory
- 5Scale workers: docker compose up -d --scale airflow-worker=3
- 6Check task logs in ./logs directory
Individual Services(8 services)
Copy individual services to mix and match with your existing compose files.
postgres
postgres:
image: postgres:15-alpine
container_name: airflow-postgres
restart: unless-stopped
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=airflow
volumes:
- postgres_data:/var/lib/postgresql/data
redis
redis:
image: redis:7-alpine
container_name: airflow-redis
restart: unless-stopped
airflow-webserver
airflow-webserver:
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- redis
- postgres
container_name: airflow-webserver
command: webserver
ports:
- ${AIRFLOW_PORT:-8080}:8080
restart: unless-stopped
airflow-scheduler
airflow-scheduler:
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- redis
- postgres
container_name: airflow-scheduler
command: scheduler
restart: unless-stopped
airflow-worker
airflow-worker:
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- redis
- postgres
container_name: airflow-worker
command: celery worker
restart: unless-stopped
airflow-triggerer
airflow-triggerer:
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- redis
- postgres
container_name: airflow-triggerer
command: triggerer
restart: unless-stopped
airflow-init
airflow-init:
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- redis
- postgres
container_name: airflow-init
entrypoint: /bin/bash
command:
- "-c"
- |
airflow db init
airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com --password ${ADMIN_PASSWORD}
profiles:
- init
flower
flower:
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- redis
- postgres
container_name: airflow-flower
command: celery flower
ports:
- ${FLOWER_PORT:-5555}:5555
restart: unless-stopped
Quick Start
terminal
1# 1. Create the compose file2cat > docker-compose.yml << 'EOF'3x-airflow-common: &airflow-common4 image: apache/airflow:2.8.05 environment: &airflow-common-env6 AIRFLOW__CORE__EXECUTOR: CeleryExecutor7 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${POSTGRES_PASSWORD}@postgres/airflow8 AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:${POSTGRES_PASSWORD}@postgres/airflow9 AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/010 AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}11 AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'12 AIRFLOW__CORE__LOAD_EXAMPLES: 'false'13 AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'14 volumes:15 - ./dags:/opt/airflow/dags16 - ./logs:/opt/airflow/logs17 - ./plugins:/opt/airflow/plugins18 depends_on:19 - redis20 - postgres2122services:23 postgres:24 image: postgres:15-alpine25 container_name: airflow-postgres26 restart: unless-stopped27 environment:28 - POSTGRES_USER=airflow29 - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}30 - POSTGRES_DB=airflow31 volumes:32 - postgres_data:/var/lib/postgresql/data3334 redis:35 image: redis:7-alpine36 container_name: airflow-redis37 restart: unless-stopped3839 airflow-webserver:40 <<: *airflow-common41 container_name: airflow-webserver42 command: webserver43 ports:44 - "${AIRFLOW_PORT:-8080}:8080"45 restart: unless-stopped4647 airflow-scheduler:48 <<: *airflow-common49 container_name: airflow-scheduler50 command: scheduler51 restart: unless-stopped5253 airflow-worker:54 <<: *airflow-common55 container_name: airflow-worker56 command: celery worker57 restart: unless-stopped5859 airflow-triggerer:60 <<: *airflow-common61 container_name: airflow-triggerer62 command: triggerer63 restart: unless-stopped6465 airflow-init:66 <<: *airflow-common67 container_name: airflow-init68 entrypoint: /bin/bash69 command:70 - -c71 - |72 airflow db init73 airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com --password ${ADMIN_PASSWORD}74 profiles:75 - init7677 flower:78 <<: *airflow-common79 container_name: airflow-flower80 command: celery flower81 ports:82 - "${FLOWER_PORT:-5555}:5555"83 restart: unless-stopped8485volumes:86 postgres_data:87EOF8889# 2. Create the .env file90cat > .env << 'EOF'91# Apache Airflow Stack92AIRFLOW_PORT=808093FLOWER_PORT=55559495# Database96POSTGRES_PASSWORD=airflow_password9798# Fernet key (generate with: python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")99FERNET_KEY=your-fernet-key-here100101# Admin user102ADMIN_PASSWORD=admin103EOF104105# 3. Start the services106docker compose up -d107108# 4. View logs109docker compose logs -fOne-Liner
Run this command to download and set up the recipe in one step:
terminal
1curl -fsSL https://docker.recipes/api/recipes/airflow-data-pipeline/run | bashTroubleshooting
- ImportError in DAGs folder: Ensure Python dependencies are installed in Airflow image or mount virtual environment
- Celery workers not picking up tasks: Check Redis connectivity and verify AIRFLOW__CELERY__BROKER_URL environment variable
- Scheduler not creating task instances: Verify DAG file syntax and check scheduler logs for parsing errors
- Database connection timeout: Increase PostgreSQL max_connections and check AIRFLOW__DATABASE__SQL_ALCHEMY_CONN format
- Flower shows no workers: Ensure airflow-worker container is running and Redis broker URL matches across all services
- Tasks stuck in queued state: Scale up workers with --scale airflow-worker=N or check worker resource limits
Community Notes
Loading...
Loading notes...
Download Recipe Kit
Get all files in a ready-to-deploy package
Includes docker-compose.yml, .env template, README, and license
Components
airflowpostgresredisceleryflower
Tags
#airflow#data-pipeline#etl#workflow#scheduling#celery
Category
DevOps & CI/CDAd Space
Shortcuts: C CopyF FavoriteD Download