Skip to content

lubobali/dbt-airflow-wap-pipeline

Repository files navigation

dbt + Airflow WAP Pipeline

Production-ready data pipeline implementing the WAP (Write-Audit-Publish) pattern using dbt, Airflow, and Snowflake.

Airflow DAG

What is WAP?

WAP is a data quality pattern used in production pipelines:

  1. Write - Load data into an audit/staging table
  2. Audit - Run tests to validate data quality
  3. Publish - Only if tests pass, merge into production table

This catches bad data before it hits your production tables and dashboards.

Tech Stack

Tool Purpose
dbt Data transformation & testing
Airflow Orchestration & scheduling
Cosmos dbt-Airflow integration
Snowflake Data warehouse
Astro CLI Local Airflow development

Project Structure

├── dbt_project/
│   ├── models/
│   │   ├── staging/
│   │   │   ├── stg_haunted_house_tickets.sql
│   │   │   └── stg_customer_feedbacks.sql
│   │   └── marts/
│   │       ├── audit_fact_visits.sql    # WAP: Audit table
│   │       ├── audit_fact_visits.yml    # Tests run here
│   │       └── fact_visits.sql          # WAP: Production table
│   └── profiles.yml
│
├── dags/
│   └── dbt/
│       ├── dbt_project_dag.py           # Runs full dbt project
│       └── dbt_wap_dag.py               # Runs WAP pipeline only
│
└── README.md

The WAP Flow

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   stg_tickets   │────▶│ audit_fact_     │────▶│  fact_visits    │
│   stg_feedbacks │     │ visits + TESTS  │     │  (incremental)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
      WRITE                   AUDIT                  PUBLISH

Key Files

audit_fact_visits.sql

Filters source data for specific purchase_date. This is your "staging area" where data gets validated before going to production.

audit_fact_visits.yml

Defines data quality tests:

  • unique & not_null on ticket_id
  • is_positive on ticket_price

If any test fails, the pipeline stops. Bad data never reaches production.

fact_visits.sql

Incremental model with merge strategy. Only receives data that passed all tests in the audit table.

config(
    materialized='incremental',
    unique_key='ticket_id',
    incremental_strategy='merge'
)

dbt_wap_dag.py

Airflow DAG using Cosmos. Runs +fact_visits which triggers the entire upstream WAP chain.

Running Locally

Prerequisites: Docker, Astro CLI, Snowflake account

# Install dbt packages
cd dbt_project
dbt deps

# Test the pipeline
dbt build --select +fact_visits

# Start Airflow
astro dev start

# Open Airflow UI
open http://localhost:8080

Why This Approach?

Before (Pure Python) After (dbt + Airflow)
300+ lines of code 3 SQL files
Custom test logic Built-in dbt tests
Manual orchestration Visual DAG in Airflow
Hard to debug Click and see what failed

Screenshots

WAP Pipeline (dbt_wap_dag)

WAP DAG

Full Project Pipeline (dbt_project_dag)

Full Project


Built by Lubo

About

WAP (Write-Audit-Publish) data pipeline using dbt + Airflow with Cosmos integration

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors