Skip to content

Replace manual SSH commands with SSHOperator in Airflow DAGs #90

Replace manual SSH commands with SSHOperator in Airflow DAGs

Replace manual SSH commands with SSHOperator in Airflow DAGs #90

# =============================================================================
# Airflow Infrastructure DAG Validation
# =============================================================================
# Validates Airflow configuration, DAG syntax, and container builds
# Note: Full VM testing requires self-hosted runner with KVM access
#
# Architecture (ADR-0047):
# - qubinode_navigator: Platform DAGs (dag_factory, rag_*, smoke_test)
# - qubinode-pipelines: Deployment DAGs (infrastructure, OCP, etc.)
# - Both are validated together to match production layout
#
# Jobs (all run on GitHub-hosted runners):
#
# 1. validate-dags:
# - DAG Python syntax (both repos)
# - DAG linting (ADR-0045/ADR-0046 compliance)
# - Airflow DAG imports
#
# 2. validate-containers:
# - docker-compose.yml syntax
# - Container image builds
# - Required files verification
#
# 3. smoke-test (Full Integration):
# - Spins up PostgreSQL + Marquez services
# - Executes smoke_test_dag with OpenLineage
# - Validates lineage data captured in Marquez
# - Tests failure handling path
# - Validates infrastructure DAG structures (from qubinode-pipelines)
# - Tests MCP server startup
#
# What requires self-hosted runner with KVM:
# - VM provisioning (kcli, libvirt)
# - Full infrastructure DAG execution
# - Ansible playbook execution
#
# Cross-Repository Testing:
# - This workflow clones Qubinode/qubinode-pipelines to /opt/qubinode-pipelines
# - qubinode-pipelines has its own CI (dag-validation.yml) for standalone testing
# - Future: Release process will coordinate versions between both repos
# =============================================================================
name: Airflow Validation
on:
push:
branches: [main, develop]
paths:
- 'airflow/**'
- '.github/workflows/airflow-validate.yml'
pull_request:
branches: [main]
paths:
- 'airflow/**'
workflow_dispatch:
inputs:
pipelines_ref:
description: 'qubinode-pipelines branch/tag/SHA to test against'
required: false
default: 'main'
type: string
jobs:
# ==========================================================================
# DAG Validation (runs on GitHub-hosted runners)
# ==========================================================================
validate-dags:
name: Validate DAGs
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Checkout qubinode-pipelines
uses: actions/checkout@v6
with:
repository: Qubinode/qubinode-pipelines
ref: ${{ inputs.pipelines_ref || 'main' }}
path: qubinode-pipelines
- name: Setup qubinode-pipelines (production layout)
run: |
# Create /opt directory structure matching production
sudo mkdir -p /opt
sudo ln -s ${{ github.workspace }}/qubinode-pipelines /opt/qubinode-pipelines
echo "[OK] qubinode-pipelines mounted at /opt/qubinode-pipelines"
echo "[INFO] Using ref: ${{ inputs.pipelines_ref || 'main' }}"
ls -la /opt/qubinode-pipelines/dags/ || echo "[WARN] No dags directory found"
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
# Install Airflow 2.10.4 with constraints file to ensure correct version
PYTHON_VERSION=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.10.4/constraints-${PYTHON_VERSION}.txt"
echo "Using constraints: $CONSTRAINT_URL"
pip install "apache-airflow==2.10.4" --constraint "$CONSTRAINT_URL"
pip install "apache-airflow-providers-postgres" --constraint "$CONSTRAINT_URL"
pip install "apache-airflow-providers-ssh" --constraint "$CONSTRAINT_URL"
pip install pyyaml
# Find scripts directory
SCRIPTS_DIR=$(python3 -c "import sysconfig; print(sysconfig.get_path('scripts'))")
echo "Scripts directory: $SCRIPTS_DIR"
ls -la "$SCRIPTS_DIR/" | grep -E "airflow|^total" || true
# Verify airflow version and add to PATH
echo "$SCRIPTS_DIR" >> $GITHUB_PATH
if [[ -x "$SCRIPTS_DIR/airflow" ]]; then
"$SCRIPTS_DIR/airflow" version
else
echo "ERROR: airflow binary not found in $SCRIPTS_DIR"
pip show apache-airflow | head -10
exit 1
fi
- name: Validate DAG syntax
run: |
echo "========================================"
echo "Validating DAG Python syntax"
echo "========================================"
echo ""
echo "--- Platform DAGs (qubinode_navigator) ---"
for dag in airflow/dags/*.py; do
echo "Checking $dag..."
python3 -c "import ast; ast.parse(open('$dag').read())" || exit 1
done
echo "[OK] Platform DAGs have valid Python syntax"
echo ""
echo "--- Deployment DAGs (qubinode-pipelines) ---"
for category in infrastructure ocp networking storage security; do
if [ -d "/opt/qubinode-pipelines/dags/$category" ]; then
for dag in /opt/qubinode-pipelines/dags/$category/*.py; do
if [ -f "$dag" ]; then
echo "Checking $dag..."
python3 -c "import ast; ast.parse(open('$dag').read())" || exit 1
fi
done
fi
done
echo "[OK] Deployment DAGs have valid Python syntax"
- name: Lint DAGs (ADR-0045/ADR-0046)
run: |
chmod +x airflow/scripts/lint-dags.sh
echo "--- Linting Platform DAGs ---"
./airflow/scripts/lint-dags.sh airflow/dags/
echo ""
echo "--- Linting Deployment DAGs ---"
# Lint qubinode-pipelines DAGs (each category)
for category in infrastructure ocp networking storage security; do
if [ -d "/opt/qubinode-pipelines/dags/$category" ]; then
echo "Linting /opt/qubinode-pipelines/dags/$category/"
./airflow/scripts/lint-dags.sh /opt/qubinode-pipelines/dags/$category/ || true
fi
done
- name: Check Airflow DAG imports
run: |
cd airflow
export AIRFLOW_HOME=$(pwd)
export AIRFLOW__CORE__LOAD_EXAMPLES=false
# Use absolute path for SQLite database
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="sqlite:////tmp/airflow.db"
# Verify airflow is accessible
echo "Airflow location: $(which airflow || echo 'not in PATH')"
airflow version || { echo "[ERROR] airflow CLI not found"; exit 1; }
# Initialize minimal Airflow DB
airflow db migrate
# Use Python DagBag API for reliable import error checking
# This is the recommended approach per Airflow best practices
# Test both platform DAGs and deployment DAGs
python3 << 'PYTHON_SCRIPT'
import sys
import os
from pathlib import Path
from airflow.models import DagBag
print("=== Loading DAGs with DagBag ===")
print("")
all_dags = {}
all_errors = {}
# Load platform DAGs (qubinode_navigator)
print("--- Platform DAGs (qubinode_navigator/airflow/dags) ---")
dagbag_platform = DagBag(dag_folder='dags', include_examples=False)
print(f"Loaded {len(dagbag_platform.dags)} platform DAGs")
all_dags.update(dagbag_platform.dags)
all_errors.update(dagbag_platform.import_errors)
# Load deployment DAGs (qubinode-pipelines)
print("")
print("--- Deployment DAGs (qubinode-pipelines) ---")
pipelines_dags_path = '/opt/qubinode-pipelines/dags'
if os.path.exists(pipelines_dags_path):
for category in ['infrastructure', 'ocp', 'networking', 'storage', 'security']:
category_path = os.path.join(pipelines_dags_path, category)
if os.path.exists(category_path) and os.listdir(category_path):
py_files = [f for f in os.listdir(category_path) if f.endswith('.py') and not f.startswith('__')]
if py_files:
print(f" Loading {category}/ ({len(py_files)} files)...")
dagbag_cat = DagBag(dag_folder=category_path, include_examples=False)
all_dags.update(dagbag_cat.dags)
all_errors.update(dagbag_cat.import_errors)
else:
print(f" [WARN] {pipelines_dags_path} not found")
# Report all loaded DAGs
print(f"\n=== Total Loaded: {len(all_dags)} DAGs ===")
for dag_id in sorted(all_dags.keys()):
print(f" [OK] {dag_id}")
# Check for import errors
if all_errors:
print(f"\n=== Import Errors ({len(all_errors)}) ===")
non_duplicate_errors = []
duplicate_warnings = []
for filepath, error in all_errors.items():
error_str = str(error)
if "DuplicatedIdException" in error_str:
duplicate_warnings.append((filepath, error_str))
else:
non_duplicate_errors.append((filepath, error_str))
# Report duplicate warnings (known issue)
if duplicate_warnings:
print(f"\n[WARN] {len(duplicate_warnings)} duplicate DAG ID warnings (known issue):")
for filepath, error in duplicate_warnings:
print(f" {filepath}")
# Report real errors
if non_duplicate_errors:
print(f"\n[ERROR] {len(non_duplicate_errors)} DAG import errors:")
for filepath, error in non_duplicate_errors:
print(f" {filepath}: {error[:200]}")
sys.exit(1)
print("\n[OK] All DAGs validated successfully")
PYTHON_SCRIPT
# ==========================================================================
# Container Build Validation (runs on GitHub-hosted runners)
# ==========================================================================
validate-containers:
name: Validate Container Build
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Validate docker-compose.yml syntax
run: |
# Install docker-compose if needed
docker compose version || pip install docker-compose
cd airflow
docker compose config --quiet && echo "[OK] docker-compose.yml is valid"
- name: Build Airflow image
run: |
cd airflow
docker compose build --no-cache airflow-webserver
echo "[OK] Airflow image built successfully"
- name: Verify required files exist
run: |
echo "Checking required files..."
required_files=(
"airflow/docker-compose.yml"
"airflow/Dockerfile"
"airflow/Makefile"
"airflow/config/marquez.yml"
"airflow/scripts/mcp_server_fastmcp.py"
"airflow/scripts/clear-dag-cache.sh"
"airflow/scripts/init-prerequisites.sh"
"airflow/scripts/lint-dags.sh"
"airflow/init-scripts/001-pgvector-schema.sql"
"airflow/init-scripts/002-marquez-db.sql"
"airflow/dags/smoke_test_dag.py"
)
for file in "${required_files[@]}"; do
if [[ -f "$file" ]]; then
echo "[OK] $file"
else
echo "[ERROR] Missing: $file"
exit 1
fi
done
echo "[OK] All required files exist"
# ==========================================================================
# Smoke Tests (runs on GitHub-hosted runners)
# ==========================================================================
# Tests actual DAG execution with OpenLineage/Marquez integration
smoke-test:
name: Smoke Test (Full Integration)
runs-on: ubuntu-latest
needs: [validate-dags, validate-containers]
services:
# PostgreSQL for Airflow metadata AND Marquez
# Marquez is started manually after DB init (not as service container)
# because it needs the marquez user/database created first
postgres:
image: postgres:15
env:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Checkout qubinode-pipelines
uses: actions/checkout@v6
with:
repository: Qubinode/qubinode-pipelines
ref: ${{ inputs.pipelines_ref || 'main' }}
path: qubinode-pipelines
- name: Setup qubinode-pipelines (production layout)
run: |
# Create /opt directory structure matching production
sudo mkdir -p /opt
sudo ln -s ${{ github.workspace }}/qubinode-pipelines /opt/qubinode-pipelines
echo "[OK] qubinode-pipelines mounted at /opt/qubinode-pipelines"
echo "[INFO] Using ref: ${{ inputs.pipelines_ref || 'main' }}"
ls -la /opt/qubinode-pipelines/dags/ || echo "[WARN] No dags directory found"
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
# Install Airflow 2.10.4 with constraints file to ensure correct version
PYTHON_VERSION=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.10.4/constraints-${PYTHON_VERSION}.txt"
echo "Using constraints: $CONSTRAINT_URL"
pip install "apache-airflow==2.10.4" --constraint "$CONSTRAINT_URL"
pip install "apache-airflow-providers-postgres" --constraint "$CONSTRAINT_URL"
pip install "apache-airflow-providers-ssh" --constraint "$CONSTRAINT_URL"
pip install openlineage-airflow
pip install pyyaml
pip install httpx # For MCP server testing
# Find scripts directory and add to PATH
SCRIPTS_DIR=$(python3 -c "import sysconfig; print(sysconfig.get_path('scripts'))")
echo "Scripts directory: $SCRIPTS_DIR"
echo "$SCRIPTS_DIR" >> $GITHUB_PATH
# Verify airflow
if [[ -x "$SCRIPTS_DIR/airflow" ]]; then
"$SCRIPTS_DIR/airflow" version
else
echo "ERROR: airflow binary not found"
pip show apache-airflow | head -10
exit 1
fi
- name: Create Marquez database and user
run: |
echo "Creating Marquez database and user in PostgreSQL..."
echo "(Replicating airflow/init-scripts/002-marquez-db.sql)"
# Wait for PostgreSQL to be fully ready
for i in {1..30}; do
if PGPASSWORD=airflow psql -h localhost -U airflow -d airflow -c '\q' 2>/dev/null; then
echo "[OK] PostgreSQL is accepting connections"
break
fi
echo "Waiting for PostgreSQL... ($i/30)"
sleep 2
done
# Create marquez user and database (same as init-scripts/002-marquez-db.sql)
PGPASSWORD=airflow psql -h localhost -U airflow -d airflow << 'EOF'
CREATE USER marquez WITH PASSWORD 'marquez';
CREATE DATABASE marquez OWNER marquez;
GRANT ALL PRIVILEGES ON DATABASE marquez TO marquez;
EOF
echo "[OK] Marquez database and user created"
- name: Start Marquez containers
run: |
echo "Starting Marquez API container..."
docker run -d \
--name marquez \
--network host \
-e MARQUEZ_PORT=5001 \
-e MARQUEZ_ADMIN_PORT=5002 \
-e POSTGRES_HOST=localhost \
-e POSTGRES_PORT=5432 \
-e POSTGRES_DB=marquez \
-e POSTGRES_USER=marquez \
-e POSTGRES_PASSWORD=marquez \
docker.io/marquezproject/marquez:latest
echo "Starting Marquez Web UI container..."
docker run -d \
--name marquez-web \
--network host \
-e MARQUEZ_HOST=localhost \
-e MARQUEZ_PORT=5001 \
-e WEB_PORT=3000 \
docker.io/marquezproject/marquez-web:latest
echo "[OK] Marquez containers started"
- name: Wait for Marquez to be ready
run: |
echo "Waiting for Marquez API..."
for i in {1..30}; do
if curl -sf http://localhost:5001/api/v1/namespaces > /dev/null 2>&1; then
echo "[OK] Marquez API is ready"
break
fi
echo " Attempt $i/30: Marquez not ready yet..."
# Show container logs on failures for debugging
if [ $i -eq 15 ]; then
echo " Marquez container logs:"
docker logs marquez 2>&1 | tail -20 || true
fi
sleep 5
done
# Verify Marquez is responding
curl -s http://localhost:5001/api/v1/namespaces | head -100 || echo "[WARN] Marquez may not be fully ready"
echo ""
- name: Initialize Airflow DB
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
OPENLINEAGE_URL: http://localhost:5001
OPENLINEAGE_NAMESPACE: smoke_test
run: |
echo "Airflow location: $(which airflow || echo 'not in PATH')"
airflow db init
airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
echo "[OK] Airflow DB initialized"
- name: List and validate DAGs
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
run: |
echo "========================================"
echo "DAG Validation"
echo "========================================"
# List all DAGs (duplicate DAG ID warnings are expected - known issue)
airflow dags list || true
# Use list-import-errors to check for real import failures
# Duplicate DAG IDs are a known issue that doesn't prevent execution
echo ""
echo "Checking for import errors..."
IMPORT_ERRORS=$(airflow dags list-import-errors 2>&1 || true)
echo "$IMPORT_ERRORS"
# Count actual errors (excluding duplicate DAG ID warnings)
ERROR_COUNT=$(echo "$IMPORT_ERRORS" | grep -c "^/" || echo "0")
DUPLICATE_COUNT=$(echo "$IMPORT_ERRORS" | grep -c "DuplicatedIdException" || echo "0")
REAL_ERRORS=$((ERROR_COUNT - DUPLICATE_COUNT))
if [ "$REAL_ERRORS" -gt 0 ]; then
echo "[ERROR] Found $REAL_ERRORS DAG import errors (excluding duplicates)"
exit 1
fi
if [ "$DUPLICATE_COUNT" -gt 0 ]; then
echo "[WARN] Found $DUPLICATE_COUNT duplicate DAG ID warnings (known issue)"
fi
echo ""
echo "[OK] All DAGs loaded successfully"
- name: Execute smoke_test_dag with OpenLineage
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
OPENLINEAGE_URL: http://localhost:5001
OPENLINEAGE_NAMESPACE: smoke_test
OPENLINEAGE_DISABLED: 'false'
run: |
echo "========================================"
echo "Executing Smoke Test DAG"
echo "========================================"
# Run the smoke test DAG - this will emit OpenLineage events
airflow dags test smoke_test_dag 2025-01-01 2>&1 | tee dag_execution.log
# Check for success
if grep -q "\[OK\] Smoke test completed successfully" dag_execution.log; then
echo ""
echo "[OK] Smoke test DAG executed successfully"
else
echo ""
echo "[ERROR] Smoke test DAG failed"
exit 1
fi
- name: Validate OpenLineage events in Marquez
run: |
echo "========================================"
echo "Validating OpenLineage/Marquez Integration"
echo "========================================"
# Give Marquez time to process events
sleep 5
# Check namespaces
echo "Checking Marquez namespaces..."
NAMESPACES=$(curl -sf http://localhost:5001/api/v1/namespaces)
echo "$NAMESPACES" | python3 -m json.tool || echo "$NAMESPACES"
# Check for jobs in the smoke_test namespace
echo ""
echo "Checking for jobs..."
JOBS=$(curl -sf "http://localhost:5001/api/v1/namespaces/smoke_test/jobs" 2>/dev/null || echo '{"jobs":[]}')
echo "$JOBS" | python3 -m json.tool 2>/dev/null || echo "$JOBS"
# Check for any runs
echo ""
echo "Checking for runs..."
RUNS=$(curl -sf "http://localhost:5001/api/v1/jobs/runs" 2>/dev/null || echo '{"runs":[]}')
echo "$RUNS" | python3 -m json.tool 2>/dev/null || echo "$RUNS"
# Verify we have some data (OpenLineage integration working)
if echo "$NAMESPACES" | grep -q "smoke_test\|default"; then
echo ""
echo "[OK] Marquez has namespace data"
else
echo ""
echo "[WARN] No smoke_test namespace found - OpenLineage may not be fully integrated"
fi
echo ""
echo "[OK] Marquez validation complete"
- name: Test failure handling (optional)
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
OPENLINEAGE_URL: http://localhost:5001
OPENLINEAGE_NAMESPACE: smoke_test_failures
continue-on-error: true
run: |
echo "========================================"
echo "Testing Failure Path (Expected to Fail)"
echo "========================================"
# Run with should_fail=True to test error handling
airflow dags test smoke_test_dag 2025-01-02 \
--conf '{"should_fail": true}' 2>&1 | tee failure_test.log || true
if grep -q "Simulated failure" failure_test.log; then
echo ""
echo "[OK] Failure handling works correctly"
else
echo ""
echo "[WARN] Failure test did not trigger expected error"
fi
- name: Validate DAG structures (infrastructure DAGs)
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
run: |
echo "========================================"
echo "Infrastructure DAG Structure Validation"
echo "========================================"
# Validate structure of infrastructure DAGs (can't execute without KVM)
for dag_id in freeipa_deployment dns_management generic_vm_deployment; do
echo ""
echo "Checking DAG: $dag_id"
# Show DAG details (validates structure)
if airflow dags show $dag_id 2>/dev/null; then
echo "[OK] $dag_id structure valid"
# List tasks
echo "Tasks:"
airflow tasks list $dag_id 2>/dev/null | head -20
else
echo "[SKIP] $dag_id not found or has errors"
fi
done
echo ""
echo "[OK] Infrastructure DAG validation complete"
- name: Validate OpenLineage configuration
run: |
echo "========================================"
echo "OpenLineage Configuration Check"
echo "========================================"
# Check that OpenLineage is configured in docker-compose
if grep -q "OPENLINEAGE_URL" airflow/docker-compose.yml; then
echo "[OK] OPENLINEAGE_URL configured"
else
echo "[WARN] OPENLINEAGE_URL not found in docker-compose.yml"
fi
if grep -q "OPENLINEAGE_DISABLED" airflow/docker-compose.yml; then
# Check it's set to false (enabled)
if grep "OPENLINEAGE_DISABLED.*false" airflow/docker-compose.yml; then
echo "[OK] OpenLineage enabled (OPENLINEAGE_DISABLED=false)"
else
echo "[WARN] OpenLineage may be disabled"
fi
fi
# Check Marquez configuration exists
if [[ -f "airflow/config/marquez.yml" ]]; then
echo "[OK] Marquez config file exists"
# Validate YAML syntax
python3 -c "import yaml; yaml.safe_load(open('airflow/config/marquez.yml'))" && \
echo "[OK] marquez.yml is valid YAML" || \
echo "[ERROR] marquez.yml has invalid YAML syntax"
else
echo "[ERROR] airflow/config/marquez.yml not found"
exit 1
fi
echo ""
echo "[OK] OpenLineage/Marquez configuration validated"
- name: Validate ADR-0046 SSH patterns
run: |
echo "========================================"
echo "ADR-0046 SSH Pattern Validation"
echo "========================================"
ERRORS=0
for dag_file in airflow/dags/*.py; do
filename=$(basename "$dag_file")
# Skip non-DAG files
[[ "$filename" == __* ]] && continue
[[ "$filename" == .* ]] && continue
echo "Checking: $filename"
# Check for kcli/virsh commands that are NOT wrapped in SSH
# This is a heuristic check - looks for bare kcli/virsh in bash_command
if grep -E 'bash_command.*kcli|bash_command.*virsh' "$dag_file" | grep -v 'ssh.*root@localhost' | grep -qv "^#"; then
# More detailed check: find kcli/virsh that appear to be direct calls
if grep -B10 'kcli\s' "$dag_file" | grep -q "bash_command=" && \
! grep -B10 'kcli\s' "$dag_file" | grep -q "ssh.*root@localhost"; then
echo " [WARN] Possible bare kcli command (not SSH-wrapped)"
# Don't fail - this is a heuristic
fi
fi
# Note: Triple single quotes are allowed when bash commands contain
# curly braces (awk, bash arrays) that conflict with f-string parsing.
# The lint-dags.sh script handles this validation.
echo " [OK] Passed basic checks"
done
if [[ $ERRORS -gt 0 ]]; then
echo ""
echo "[ERROR] Found $ERRORS ADR violations"
exit 1
fi
echo ""
echo "[OK] ADR-0046 SSH pattern validation passed"
- name: Test MCP Server (FastMCP)
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/airflow/dags
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
OPENLINEAGE_URL: http://localhost:5001
run: |
echo "========================================"
echo "MCP Server Smoke Test"
echo "========================================"
# Install MCP dependencies
pip install mcp starlette uvicorn sse-starlette fastmcp 2>/dev/null || true
# Test MCP server can import without errors
cd airflow/scripts
python3 -c "
import sys
sys.path.insert(0, '.')
# Test import
try:
# Just test that the module syntax is valid
import ast
with open('mcp_server_fastmcp.py', 'r') as f:
ast.parse(f.read())
print('[OK] MCP server syntax valid')
except SyntaxError as e:
print(f'[ERROR] MCP server syntax error: {e}')
sys.exit(1)
"
# Test MCP server can start (quick startup test)
echo "Testing MCP server startup..."
timeout 10 python3 mcp_server_fastmcp.py &
MCP_PID=$!
sleep 3
# Check if it's running
if kill -0 $MCP_PID 2>/dev/null; then
echo "[OK] MCP server started successfully"
kill $MCP_PID 2>/dev/null || true
else
echo "[WARN] MCP server exited (may be expected without full environment)"
fi
# Test MCP server HTTP endpoint if available
if curl -sf http://localhost:8889/sse > /dev/null 2>&1; then
echo "[OK] MCP server SSE endpoint responding"
else
echo "[INFO] MCP server SSE endpoint not available (expected in CI)"
fi
echo ""
echo "[OK] MCP server smoke test complete"
- name: Print test summary
if: always()
run: |
echo ""
echo "========================================"
echo "SMOKE TEST SUMMARY"
echo "========================================"
echo ""
echo "Services tested:"
echo " - PostgreSQL (Airflow metadata)"
echo " - Airflow (DAG execution)"
echo " - Marquez (OpenLineage backend)"
echo " - MCP Server (FastMCP)"
echo ""
echo "Tests performed:"
echo " - DAG import validation"
echo " - smoke_test_dag execution"
echo " - OpenLineage event emission"
echo " - Marquez lineage capture"
echo " - Failure path handling"
echo " - Infrastructure DAG structure validation"
echo " - ADR-0045/ADR-0046 compliance"
echo " - MCP server syntax and startup"
echo ""
echo "========================================"
# ==========================================================================
# Full Integration Tests (requires self-hosted runner with KVM)
# ==========================================================================
# integration-tests:
# name: Integration Tests (KVM Required)
# runs-on: [self-hosted, kvm]
# needs: [validate-dags, validate-containers, smoke-test]
# if: github.event_name == 'push' && github.ref == 'refs/heads/main'
#
# steps:
# - name: Checkout code
# uses: actions/checkout@v6
#
# - name: Start Airflow stack
# run: |
# cd airflow
# make install
#
# - name: Wait for services
# run: |
# sleep 60
# cd airflow
# make health
#
# - name: Run DAG tests
# run: |
# cd airflow
# # Test DAG execution (requires KVM for VM operations)
# podman-compose exec airflow-scheduler airflow dags test freeipa_deployment 2025-01-01
#
# - name: Cleanup
# if: always()
# run: |
# cd airflow
# make uninstall