Skip to content

IberaSoft/automated-weather-pipeline

Repository files navigation

☁️ Automated Weather Data Pipeline

A production-ready ETL pipeline that collects weather data daily, processes it, and generates visual reports. Learn data engineering fundamentals with a real-world project.

Python PostgreSQL Docker Pandas License

Pipeline Demo

🎯 What I learned with this project:

  • ETL Pipeline Design: Extract, Transform, Load architecture
  • Workflow Automation: Scheduled daily data collection and processing
  • API Integration: Working with external REST APIs (OpenWeatherMap)
  • Data Processing: Cleaning, aggregating, and analyzing time-series data
  • Database Operations: Schema design, migrations, and queries
  • Reporting: Automated HTML reports with visualizations
  • Production Practices: Error handling, logging, monitoring, Docker deployment

🌟 Key Features

graph LR
    A[⏰ Scheduler<br/>8 AM Daily] --> B[📡 Extract<br/>Weather API]
    B --> C[🔄 Transform<br/>Clean & Process]
    C --> D[💾 Load<br/>PostgreSQL]
    D --> E[📊 Generate<br/>Report]
    E --> F[📧 Notify<br/>Slack]
    
    style A fill:#e1f5ff
    style C fill:#fff3e0
    style D fill:#e8f5e9
    style E fill:#fce4ec
    style F fill:#f3e5f5
Loading

Pipeline Capabilities

  • 🌍 Multi-City Tracking: Monitor weather for 10+ configurable cities
  • 📈 Historical Analysis: Compare current vs. past week's weather
  • 📊 Visual Reports: Auto-generated HTML with charts
  • 🔔 Smart Alerts: Notifications for extreme weather conditions
  • 🌬️ Air Quality Monitoring: Track AQI and pollution components (v1.1)
  • ☀️ UV Index Tracking: Monitor UV levels for health awareness (v1.1)
  • 📅 7-Day Forecasts: Extended weather predictions (v1.1)
  • 📊 Historical Averages: Long-term weather pattern analysis (v1.1)
  • 📝 Comprehensive Logging: Track every step of the pipeline
  • 🔄 Automatic Retry: Handles API failures gracefully
  • 💻 Local Development: Test and develop on your machine
  • 🐳 Production Ready: Docker deployment for VPS hosting

🚀 Quick Start

Prerequisites

  • OpenWeatherMap API key (Get free key)
  • (Optional) Slack webhook for notifications

Choose Your Environment

This project supports two deployment environments:

  1. Local Development - Run on your machine for testing and development
  2. Production - Deploy to VPS (Hostinger) using Docker

💻 Local Development (Recommended for Testing)

Perfect for development, testing, and learning. Run directly on your machine.

Prerequisites for Local Development

  • Python 3.11+
  • PostgreSQL 15+ (or use Docker for PostgreSQL only)
  • OpenWeatherMap API key

Quick Setup (Automated)

# 1. Clone repository
git clone https://github.com/yourusername/automated-weather-pipeline.git
cd automated-weather-pipeline

# 2. Run automated setup script
chmod +x scripts/setup_local.sh
./scripts/setup_local.sh

# 3. Edit .env.local and add your WEATHER_API_KEY
nano .env.local  # or use your preferred editor

# 4. Activate virtual environment
source venv/bin/activate  # On Windows: venv\Scripts\activate

# 5. Initialize database
python database/init_db.py

# 6. Test API connection
python scripts/test_api.py

# 7. Run pipeline manually
python scripts/manual_run.py

# 8. View generated report
open reports/weather_report_*.html  # macOS
# or
xdg-open reports/weather_report_*.html  # Linux

Manual Setup

See LOCAL_DEVELOPMENT.md for detailed manual setup instructions.

Running Locally

# Activate virtual environment
source venv/bin/activate

# Load environment variables
export $(cat .env.local | grep -v '^#' | xargs)
export ENVIRONMENT=local

# Run pipeline manually
python scripts/manual_run.py

# Or use convenience script
chmod +x scripts/run_local.sh
./scripts/run_local.sh

📖 Full Local Development Guide: See LOCAL_DEVELOPMENT.md


🐳 Production Deployment (Docker)

Deploy to production using Docker Compose. Recommended for VPS hosting.

Prerequisites for Production

  • Docker & Docker Compose
  • VPS with Docker support (e.g., Hostinger VPS)
  • OpenWeatherMap API key

Quick Setup (Docker)

# 1. Clone repository
git clone https://github.com/yourusername/automated-weather-pipeline.git
cd automated-weather-pipeline

# 2. Configure environment
cp .env.example .env
# Edit .env and add your WEATHER_API_KEY

# 3. Start services
docker-compose up -d

# 4. Initialize database
docker-compose exec pipeline python database/init_db.py

# 5. Run pipeline manually (test)
docker-compose exec pipeline python scripts/manual_run.py

# 6. Check generated report
ls reports/
# weather_report_2025-03-15.html

View Report: Open reports/weather_report_YYYY-MM-DD.html in your browser

📖 Full Production Deployment Guide: See deploy/README.md

📊 Sample Report

The pipeline generates beautiful HTML reports with:

Sample Report

  • ✅ Summary statistics table (avg/min/max temperatures)
  • ✅ Temperature trends chart (last 7 days)
  • ✅ City comparison bar chart
  • ✅ Weather alerts (extreme conditions)
  • ✅ Humidity and wind speed trends

🏗️ Architecture

flowchart TB
    subgraph Scheduler
        A[APScheduler<br/>Cron: 8 AM Daily]
    end
    
    subgraph Pipeline Stages
        B[1. Extract<br/>OpenWeather API]
        C[2. Transform<br/>Pandas Processing]
        D[3. Load<br/>PostgreSQL]
        E[4. Report<br/>Generate HTML]
        F[5. Notify<br/>Slack/Email]
    end
    
    subgraph Storage
        G[(PostgreSQL<br/>Raw & Processed Data)]
        H[Reports/<br/>HTML Files]
        I[Logs/<br/>pipeline.log]
    end
    
    A --> B
    B --> C
    C --> D
    D --> G
    D --> E
    E --> H
    E --> F
    B --> I
    C --> I
    D --> I
    
    style A fill:#e3f2fd
    style B fill:#fff9c4
    style G fill:#c8e6c9
    style H fill:#f8bbd0
Loading

Data Flow

🛠️ Tech Stack

Component Technology Purpose
Language Python 3.11 Core pipeline logic
Scheduler APScheduler Job scheduling
Database PostgreSQL 15 Data storage
Data Processing Pandas Data transformation
Visualization Matplotlib Chart generation
API Client Requests HTTP calls
Logging Python logging Monitoring
Deployment Docker Compose Containerization

📁 Project Structure

automated-weather-pipeline/
├── pipeline/
│   ├── extract.py          # Fetch from API
│   ├── transform.py        # Clean & process
│   ├── load.py             # Save to database
│   ├── report.py           # Generate HTML
│   ├── notify.py           # Send notifications
│   └── config.py           # Configuration
│
├── database/
│   ├── schema.sql          # DB tables
│   ├── init_db.py          # Initialize DB
│   └── migrations/         # Schema changes
│
├── scheduler/
│   └── run_pipeline.py     # Scheduling logic
│
├── scripts/
│   ├── manual_run.py       # Run pipeline manually
│   ├── backfill_data.py    # Load historical data
│   └── test_api.py         # Test API connection
│
├── reports/                # Generated reports
│   └── .gitkeep
│
├── tests/
│   ├── test_extract.py
│   ├── test_transform.py
│   └── test_integration.py
│
├── utils/
│   ├── logger.py           # Logging setup
│   └── db_connector.py     # DB connection
│
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── .env.example

💻 Local Development

For detailed local development setup and instructions, see LOCAL_DEVELOPMENT.md.

Quick Local Setup

# Automated setup
./scripts/setup_local.sh

# Or manual setup
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

# Configure .env.local with your API key
cp .env.example .env.local
# Edit .env.local

# Initialize database
export $(cat .env.local | grep -v '^#' | xargs)
export ENVIRONMENT=local
python database/init_db.py

# Run pipeline
python scripts/manual_run.py

Using Docker for Local PostgreSQL Only

If you prefer to use Docker just for PostgreSQL while developing locally:

# Start only PostgreSQL
docker-compose up -d postgres

# Use local Python environment
source venv/bin/activate
export POSTGRES_HOST=localhost
export ENVIRONMENT=local
python scripts/manual_run.py

Configuration

Configuration is managed through environment variables. For local development, use .env.local:

# .env.local
ENVIRONMENT=local
WEATHER_API_KEY=your_api_key
POSTGRES_HOST=localhost
POSTGRES_USER=weather_user
POSTGRES_PASSWORD=weather_pass
POSTGRES_DB=weather_db
LOG_LEVEL=DEBUG

📖 Full Guide: See LOCAL_DEVELOPMENT.md for complete instructions.

📊 Database Schema

-- Raw weather data
CREATE TABLE raw_weather (
    id SERIAL PRIMARY KEY,
    city VARCHAR(100) NOT NULL,
    temperature FLOAT,           -- Celsius
    feels_like FLOAT,
    humidity INT,                -- Percentage
    pressure INT,                -- hPa
    wind_speed FLOAT,            -- m/s
    wind_direction INT,          -- Degrees
    conditions VARCHAR(50),      -- Clear, Clouds, Rain, etc.
    icon VARCHAR(10),
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_city_timestamp (city, timestamp)
);

-- Daily aggregated data
CREATE TABLE daily_weather_summary (
    id SERIAL PRIMARY KEY,
    city VARCHAR(100) NOT NULL,
    date DATE NOT NULL,
    avg_temp FLOAT,
    min_temp FLOAT,
    max_temp FLOAT,
    avg_humidity FLOAT,
    avg_wind_speed FLOAT,
    dominant_conditions VARCHAR(50),
    UNIQUE(city, date),
    INDEX idx_city_date (city, date)
);

-- Pipeline execution log
CREATE TABLE pipeline_runs (
    id SERIAL PRIMARY KEY,
    run_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status VARCHAR(20),  -- SUCCESS, FAILED, PARTIAL
    cities_processed INT,
    duration_seconds FLOAT,
    error_message TEXT
);

🔍 Pipeline Components

1. Extract (extract.py)

def fetch_weather_data(city: str) -> dict:
    """
    Fetch current weather from OpenWeatherMap API
    
    Returns:
        {
            'city': 'London',
            'temperature': 15.5,
            'humidity': 72,
            'conditions': 'Clouds',
            ...
        }
    """
    url = f"{API_BASE_URL}/weather"
    params = {
        'q': city,
        'appid': API_KEY,
        'units': 'metric'
    }
    
    try:
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()
        return parse_api_response(response.json())
    except RequestException as e:
        logger.error(f"Failed to fetch weather for {city}: {e}")
        raise

2. Transform (transform.py)

def process_weather_data(raw_data: List[dict]) -> pd.DataFrame:
    """
    Clean and aggregate weather data
    
    Transformations:
    - Handle missing values
    - Convert units
    - Calculate daily aggregates
    - Detect anomalies
    """
    df = pd.DataFrame(raw_data)
    
    # Handle missing values
    df['temperature'].fillna(df['temperature'].mean(), inplace=True)
    
    # Add calculated fields
    df['feels_like_diff'] = df['feels_like'] - df['temperature']
    df['heat_index'] = calculate_heat_index(df)
    
    # Flag extreme conditions
    df['is_extreme'] = (
        (df['temperature'] > ALERT_TEMP_HIGH) |
        (df['temperature'] < ALERT_TEMP_LOW)
    )
    
    return df

3. Load (load.py)

def save_to_database(df: pd.DataFrame) -> None:
    """
    Save processed data to PostgreSQL
    """
    engine = create_engine(DATABASE_URL)
    
    # Save raw data
    df.to_sql('raw_weather', engine, if_exists='append', index=False)
    
    # Update daily summary
    daily_summary = df.groupby('city').agg({
        'temperature': ['mean', 'min', 'max'],
        'humidity': 'mean',
        'wind_speed': 'mean'
    })
    
    daily_summary.to_sql(
        'daily_weather_summary',
        engine,
        if_exists='append',
        index=False
    )

4. Report (report.py)

def generate_report(date: str) -> str:
    """
    Generate HTML report with visualizations
    
    Includes:
    - Summary statistics table
    - Temperature trends (line chart)
    - City comparison (bar chart)
    - Weather alerts
    """
    data = query_database(date)
    
    # Create figures
    fig1 = create_temperature_trend_chart(data)
    fig2 = create_city_comparison_chart(data)
    
    # Generate HTML
    html = render_template(
        'report_template.html',
        date=date,
        summary=calculate_summary(data),
        charts=[fig1, fig2],
        alerts=detect_alerts(data)
    )
    
    # Save report
    filename = f"reports/weather_report_{date}.html"
    with open(filename, 'w') as f:
        f.write(html)
    
    return filename

🧪 Testing

Local Testing

# Activate virtual environment
source venv/bin/activate

# Run all tests
pytest tests/ -v

# Run specific test file
pytest tests/test_extract.py -v

# Run with coverage
pytest tests/ --cov=pipeline --cov-report=html

# Integration test (requires DB)
pytest tests/test_integration.py -v

Docker Testing

# Run tests in Docker container
docker-compose exec pipeline pytest tests/ -v

# Or use Makefile
make test

Test Coverage

  • pipeline/extract.py 92%
  • pipeline/transform.py 88%
  • pipeline/load.py 95%
  • pipeline/report.py 85%
  • Overall 90%

📝 Logging

Logs are stored in logs/pipeline.log

📊 Monitoring

View Pipeline Status

# Check logs
docker-compose logs -f pipeline

# Query recent pipeline runs
docker-compose exec postgres psql -U weather_user -d weather_db -c "
SELECT run_date, status, cities_processed, duration_seconds 
FROM pipeline_runs 
ORDER BY run_date DESC 
LIMIT 10;
"

Database Queries

-- Check latest weather data
SELECT city, temperature, humidity, conditions, timestamp
FROM raw_weather
WHERE DATE(timestamp) = CURRENT_DATE
ORDER BY timestamp DESC;

-- Get weekly temperature trends
SELECT 
    city,
    DATE(timestamp) as date,
    AVG(temperature) as avg_temp,
    MIN(temperature) as min_temp,
    MAX(temperature) as max_temp
FROM raw_weather
WHERE timestamp >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY city, DATE(timestamp)
ORDER BY city, date;

-- Find extreme weather events
SELECT city, temperature, conditions, timestamp
FROM raw_weather
WHERE temperature > 35 OR temperature < -10
ORDER BY timestamp DESC
LIMIT 20;

🔧 Manual Operations

Run Pipeline Immediately

docker-compose exec pipeline python scripts/manual_run.py

Backfill Historical Data

# Load past 30 days
docker-compose exec pipeline python scripts/backfill_data.py --days 30

# Load specific date range
docker-compose exec pipeline python scripts/backfill_data.py \
  --start 2026-01-01 \
  --end 2026-01-15

Test API Connection

docker-compose exec pipeline python scripts/test_api.py

# Output:
# ✓ API key valid
# ✓ Successfully fetched weather for London
# ✓ All cities accessible

🔒 Environment Variables

Local Development (.env.local)

# Environment
ENVIRONMENT=local

# Required
WEATHER_API_KEY=your_openweathermap_api_key

# Database (Local PostgreSQL)
POSTGRES_USER=weather_user
POSTGRES_PASSWORD=weather_pass
POSTGRES_DB=weather_db
POSTGRES_HOST=localhost  # Use localhost for local dev
POSTGRES_PORT=5432

# Optional - Notifications
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
EMAIL_SMTP_HOST=smtp.gmail.com
EMAIL_SMTP_PORT=587
EMAIL_FROM=your.email@example.com
EMAIL_TO=recipient@example.com
EMAIL_PASSWORD=your_app_password

# Pipeline Settings (Local)
LOG_LEVEL=DEBUG  # More verbose for development
REPORT_RETENTION_DAYS=30
SCHEDULE_HOUR=8
SCHEDULE_MINUTE=0

Production (.env)

# Environment
ENVIRONMENT=production

# Required
WEATHER_API_KEY=your_openweathermap_api_key

# Database (Docker service name)
POSTGRES_USER=weather_user
POSTGRES_PASSWORD=weather_pass
POSTGRES_DB=weather_db
POSTGRES_HOST=postgres  # Use service name in Docker
POSTGRES_PORT=5432

# Optional - Notifications
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
EMAIL_SMTP_HOST=smtp.gmail.com
EMAIL_SMTP_PORT=587
EMAIL_FROM=your.email@example.com
EMAIL_TO=recipient@example.com
EMAIL_PASSWORD=your_app_password

# Pipeline Settings (Production)
LOG_LEVEL=INFO  # Less verbose for production
REPORT_RETENTION_DAYS=30
SCHEDULE_HOUR=8
SCHEDULE_MINUTE=0

Environment Differences

Setting Local Development Production
ENVIRONMENT local production
POSTGRES_HOST localhost postgres (Docker service)
LOG_LEVEL DEBUG INFO
Database Setup Manual PostgreSQL Docker container
Python Environment Virtual environment Docker container

🐛 Troubleshooting

API rate limit exceeded

Symptoms: HTTP 429 Too Many Requests

Solutions:

  1. Reduce number of cities in config.py
  2. Increase time between API calls
  3. Upgrade to paid API tier
  4. Use caching for recent data
# In extract.py, add delay between requests
import time
for city in CITIES:
    fetch_weather_data(city)
    time.sleep(1)  # 1 second delay
Database connection failed

Symptoms: could not connect to server

Solutions:

  1. Check PostgreSQL is running:
   docker-compose ps
  1. Verify credentials in .env
  2. Check PostgreSQL logs:
   docker-compose logs postgres
Missing reports

Symptoms: No HTML files in reports/ directory

Solutions:

  1. Check pipeline logs for errors
  2. Verify reports/ directory exists and is writable
  3. Run pipeline manually to see detailed errors:
   python scripts/manual_run.py
Charts not displaying in report

Symptoms: HTML report opens but no charts visible

Solutions:

  1. Check Matplotlib is installed
  2. Verify data exists for the date range
  3. Check report generation logs:
   grep "Report" logs/pipeline.log

💰 Cost Estimation

Service Cost Notes
OpenWeatherMap API Free 1,000 calls/day on free tier
PostgreSQL Free Docker container
Storage ~$0 <100MB for 90 days
Compute ~$5-10/month VPS for Docker (optional)
Notifications Free Slack webhooks

Total: $0-10/month depending on hosting

🗺️ Roadmap

v1.1 - Enhanced Data ✅ (Released)

  • Air quality index (AQI) tracking
  • UV index monitoring
  • Weather forecasts (7-day ahead)
  • Historical weather averages

v1.2 - Better Reporting (Planned)

  • Interactive charts with Plotly
  • Export reports as PDF
  • Email reports automatically
  • Custom report templates

v2.0 - Advanced Features (Future)

  • Real-time streaming with Kafka
  • Predictive analytics (forecast accuracy)
  • Anomaly detection (unusual patterns)
  • Web dashboard with live data
  • Mobile app notifications

v3.0 - Production Scale (Future)

  • Deploy to AWS/GCP with Terraform
  • Airflow for complex workflows
  • Data quality monitoring
  • Multi-region support

🎓 Learning Resources

Data Engineering Fundamentals

API Integration

Scheduling & Automation

🤝 Contributing

Contributions welcome! This is a learning project.

Ideas for contributions:

  • Add new data sources (air quality, UV index)
  • Improve chart visualizations
  • Add more tests
  • Create Airflow DAG version
  • Write blog post about the project

How to contribute:

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes with tests
  4. Submit a Pull Request

📝 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • OpenWeatherMap for free weather API
  • PostgreSQL community
  • Inspired by real-world data pipelines at tech companies

⭐ If this project helped you learn, please star it!

Built with ❤️ by an aspiring AI/ML Engineer

Stars Forks

About

☁️ A production-ready ETL pipeline that collects weather data daily, processes it, and generates visual reports. Learn data engineering fundamentals with a real-world project.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors