Skip to content

feat(dag): Add dynamic kcli sudo prefix helper for CI/CD compatibility #91

feat(dag): Add dynamic kcli sudo prefix helper for CI/CD compatibility

feat(dag): Add dynamic kcli sudo prefix helper for CI/CD compatibility #91

name: Airflow DAG CI/CD
on:
push:
branches: [ main, develop ]
paths:
- 'airflow/dags/**'
- 'airflow/plugins/**'
- 'tests/airflow/**'
- '.github/workflows/airflow-dag-ci.yml'
pull_request:
branches: [ main, develop ]
paths:
- 'airflow/dags/**'
- 'airflow/plugins/**'
workflow_dispatch:
permissions:
contents: read
pull-requests: write
env:
PYTHON_VERSION: '3.12'
AIRFLOW_VERSION: '2.10.4'
AIRFLOW_HOME: /tmp/airflow
jobs:
validate-dags:
name: Validate Airflow DAGs
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install Airflow and initialize database
run: |
# Use the CI setup script for consistent environment
chmod +x ./airflow/scripts/ci-setup.sh
# Install dependencies
python -m pip install --upgrade pip
echo "Installing Apache Airflow ${{ env.AIRFLOW_VERSION }}..."
# Use constraints to ensure compatible package versions
PYTHON_VERSION=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${{ env.AIRFLOW_VERSION }}/constraints-${PYTHON_VERSION}.txt"
echo "Using constraints: $CONSTRAINT_URL"
pip install "apache-airflow[postgres,celery]==${{ env.AIRFLOW_VERSION }}" --constraint "$CONSTRAINT_URL"
echo "Installing SSH provider..."
pip install "apache-airflow-providers-ssh" --constraint "$CONSTRAINT_URL"
pip install pytest pytest-asyncio pyyaml
echo "Airflow packages installed successfully"
# Install project dependencies
if [ -f requirements.txt ]; then
pip install -r requirements.txt
fi
# Setup PATH and initialize DB
SCRIPTS_DIR=$(python3 -c "import sysconfig; print(sysconfig.get_path('scripts'))")
export PATH="$SCRIPTS_DIR:$PATH"
echo "$SCRIPTS_DIR" >> $GITHUB_PATH
# Verify airflow is available
which airflow
airflow version
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
airflow db init
echo "[OK] Airflow installed and database initialized"
- name: Validate DAG Registry (ADR-0046)
run: |
export PYTHONPATH="${PYTHONPATH}:${PWD}/airflow/dags"
echo "Validating DAG registry.yaml..."
if [ -f "airflow/dags/registry.yaml" ]; then
python << 'PYTHON_SCRIPT'
import yaml
import re
import sys
REQUIRED_FIELDS = ['component', 'description', 'script_path', 'tags', 'category']
VALID_CATEGORIES = ['compute', 'network', 'identity', 'storage', 'security', 'monitoring']
NAMING_PATTERN = r'^[a-z][a-z0-9_]*$'
with open('airflow/dags/registry.yaml') as f:
registry = yaml.safe_load(f)
errors = []
for dag_config in registry.get('dags', []):
component = dag_config.get('component', 'unknown')
# Check required fields
for field in REQUIRED_FIELDS:
if field not in dag_config:
errors.append(f"{component}: Missing required field '{field}'")
# Check naming convention
if not re.match(NAMING_PATTERN, component):
errors.append(f"{component}: Name must be snake_case")
# Check category
category = dag_config.get('category', '')
if category and category not in VALID_CATEGORIES:
errors.append(f"{component}: Invalid category '{category}'")
if errors:
print("❌ Registry validation failed:")
for e in errors:
print(f" - {e}")
sys.exit(1)
dag_count = len(registry.get('dags', []))
print(f"✅ Registry valid: {dag_count} DAG definitions")
for dag in registry.get('dags', []):
print(f" - {dag['component']}_deployment ({dag['category']})")
PYTHON_SCRIPT
else
echo "⚠️ No registry.yaml found (optional)"
fi
- name: Validate DAG Factory Module
run: |
export PYTHONPATH="${PYTHONPATH}:${PWD}/airflow/dags"
if [ -f "airflow/dags/dag_factory.py" ]; then
echo "Validating dag_factory.py..."
python -c "
import dag_factory
print('✅ dag_factory module imports successfully')
print(f' Version: {dag_factory.__version__}')
print(f' Functions: {dag_factory.__all__}')
"
else
echo "⚠️ dag_factory.py not found (optional)"
fi
- name: Test Registry DAG Generation
run: |
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
export PYTHONPATH="${PYTHONPATH}:${PWD}/airflow/dags:${PWD}/airflow/plugins"
if [ -f "airflow/dags/dag_factory.py" ] && [ -f "airflow/dags/registry.yaml" ]; then
echo "Testing DAG generation from registry..."
python << 'PYTHON_SCRIPT'
import sys
sys.path.insert(0, 'airflow/dags')
from dag_factory import load_registry_dags, validate_registry_entry
# Load registry DAGs
dags = load_registry_dags('airflow/dags/registry.yaml')
if not dags:
print("⚠️ No DAGs generated from registry")
sys.exit(0)
print(f"✅ Generated {len(dags)} DAGs from registry:")
for dag_id, dag in dags.items():
task_count = len(dag.tasks)
print(f" - {dag_id}: {task_count} tasks")
PYTHON_SCRIPT
else
echo "⚠️ Skipping registry DAG generation test"
fi
- name: Validate DAG Syntax
run: |
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
export PYTHONPATH="${PYTHONPATH}:${PWD}"
echo "Validating DAG Python syntax..."
for dag in airflow/dags/*.py; do
if [[ "$dag" != *"__init__.py"* ]]; then
echo "Checking $dag..."
python -m py_compile "$dag"
echo "✅ $(basename $dag) syntax valid"
fi
done
- name: Test DAG Imports
run: |
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
export PYTHONPATH="${PYTHONPATH}:${PWD}/airflow/dags:${PWD}/airflow/plugins"
echo "Testing DAG imports..."
for dag in airflow/dags/*.py; do
if [[ "$dag" != *"__init__.py"* && "$dag" != *"dag_logging_mixin.py"* ]]; then
dag_name=$(basename "$dag" .py)
echo "Importing $dag_name..."
python -c "import $dag_name; print('✅ $dag_name imported successfully')"
fi
done
- name: Validate DAG Definitions
run: |
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
export PYTHONPATH="${PYTHONPATH}:${PWD}"
# Copy DAGs to Airflow DAGs folder
mkdir -p ${{ env.AIRFLOW_HOME }}/dags
cp -r airflow/dags/* ${{ env.AIRFLOW_HOME }}/dags/ || true
# Add scripts directory to PATH
SCRIPTS_DIR=$(python3 -c "import sysconfig; print(sysconfig.get_path('scripts'))")
export PATH="$SCRIPTS_DIR:$PATH"
# List and validate DAGs
echo "Listing DAGs..."
airflow dags list || echo "⚠️ No DAGs found or error listing"
# Test DAG loading
python << 'PYTHON_SCRIPT'
import sys
from pathlib import Path
sys.path.insert(0, 'airflow/dags')
sys.path.insert(0, 'airflow/plugins')
dag_files = list(Path('airflow/dags').glob('*.py'))
dag_files = [f for f in dag_files if not f.name.startswith('__') and f.name != 'dag_logging_mixin.py']
print(f"Found {len(dag_files)} DAG files to validate")
for dag_file in dag_files:
try:
with open(dag_file) as f:
content = f.read()
if 'DAG(' in content or 'dag =' in content.lower():
print(f"✅ {dag_file.name} contains DAG definition")
else:
print(f"⚠️ {dag_file.name} may not contain a DAG definition")
except Exception as e:
print(f"❌ Error reading {dag_file.name}: {e}")
sys.exit(1)
PYTHON_SCRIPT
test-rag-ingestion-dag:
name: Test RAG Document Ingestion DAG
runs-on: ubuntu-latest
needs: validate-dags
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install "apache-airflow[postgres]==${{ env.AIRFLOW_VERSION }}"
pip install pytest langchain chromadb
- name: Initialize Airflow
run: |
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
# Add scripts directory to PATH for this step
SCRIPTS_DIR=$(python3 -c "import sysconfig; print(sysconfig.get_path('scripts'))")
export PATH="$SCRIPTS_DIR:$PATH"
airflow db init
- name: Validate RAG Ingestion DAG
run: |
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
export PYTHONPATH="${PYTHONPATH}:${PWD}/airflow/dags:${PWD}/airflow/plugins"
if [ -f "airflow/dags/rag_document_ingestion.py" ]; then
echo "Testing RAG document ingestion DAG..."
python -c "import rag_document_ingestion; print('✅ RAG ingestion DAG imported')"
# Check for required components
python << 'PYTHON_SCRIPT'
import rag_document_ingestion
# Verify DAG exists
if hasattr(rag_document_ingestion, 'dag'):
print("✅ DAG object found")
else:
print("⚠️ No DAG object found in rag_document_ingestion.py")
# Check for task definitions
content = open('airflow/dags/rag_document_ingestion.py').read()
if 'PythonOperator' in content or 'BashOperator' in content or '@task' in content:
print("✅ Contains task definitions")
else:
print("⚠️ No task definitions found")
PYTHON_SCRIPT
else
echo "⚠️ rag_document_ingestion.py not found"
fi
- name: Test DAG Structure
run: |
export AIRFLOW_HOME=${{ env.AIRFLOW_HOME }}
export PYTHONPATH="${PYTHONPATH}:${PWD}/airflow/dags"
# Copy DAG to Airflow home
mkdir -p ${{ env.AIRFLOW_HOME }}/dags
cp airflow/dags/rag_document_ingestion.py ${{ env.AIRFLOW_HOME }}/dags/ || true
# Test DAG structure (don't run, just validate)
python << 'PYTHON_SCRIPT'
import sys
from pathlib import Path
try:
import rag_document_ingestion
print("✅ RAG ingestion DAG structure valid")
except ImportError as e:
print(f"❌ Failed to import RAG ingestion DAG: {e}")
sys.exit(1)
except Exception as e:
print(f"⚠️ Error validating DAG structure: {e}")
PYTHON_SCRIPT
test-airflow-plugins:
name: Test Airflow Plugins
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v6
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install "apache-airflow[postgres]==${{ env.AIRFLOW_VERSION }}"
pip install fastmcp httpx pytest
- name: Test MCP Plugin in Airflow
run: |
export PYTHONPATH="${PYTHONPATH}:${PWD}/airflow/plugins"
if [ -d "airflow/plugins/qubinode" ]; then
echo "Testing Airflow MCP plugin..."
# Test plugin imports
python -c "import sys; sys.path.insert(0, 'airflow/plugins'); from qubinode import mcp_server_fastmcp; print('✅ MCP plugin imports in Airflow context')" || echo "⚠️ MCP plugin import failed"
else
echo "⚠️ No qubinode plugins directory found"
fi
- name: Validate Plugin Structure
run: |
if [ -d "airflow/plugins/qubinode" ]; then
echo "Airflow plugins found:"
find airflow/plugins/qubinode -name "*.py" -type f
# Check for __init__.py
if [ -f "airflow/plugins/qubinode/__init__.py" ]; then
echo "✅ Plugin package structure valid"
else
echo "⚠️ Missing __init__.py in plugin directory"
fi
fi
generate-dag-report:
name: Generate DAG Report
runs-on: ubuntu-latest
needs: [validate-dags, test-rag-ingestion-dag, test-airflow-plugins]
if: always()
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Generate Report
run: |
echo "## Airflow DAG Validation Results" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### DAG Files" >> $GITHUB_STEP_SUMMARY
dag_count=$(find airflow/dags -name "*.py" -type f ! -name "__init__.py" ! -name "dag_logging_mixin.py" | wc -l)
echo "- Total DAG files: $dag_count" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### Validation Status" >> $GITHUB_STEP_SUMMARY
echo "- ✅ DAG syntax validation complete" >> $GITHUB_STEP_SUMMARY
echo "- ✅ DAG imports tested" >> $GITHUB_STEP_SUMMARY
echo "- ✅ RAG ingestion DAG validated" >> $GITHUB_STEP_SUMMARY
echo "- ✅ Airflow plugins tested" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "### DAG List" >> $GITHUB_STEP_SUMMARY
for dag in airflow/dags/*.py; do
if [[ "$dag" != *"__init__.py"* && "$dag" != *"dag_logging_mixin.py"* ]]; then
echo "- \`$(basename $dag)\`" >> $GITHUB_STEP_SUMMARY
fi
done