diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..6e78a70 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,60 @@ +name: Build Docker Image + +on: + push: + branches: + - main + - feat/prometheus-metrics-integration + tags: + - 'v*' + workflow_dispatch: + +permissions: + contents: read + packages: write + +env: + REGISTRY: ghcr.io + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set image name (lowercase) + id: image + run: echo "name=$(echo ${{ github.repository_owner }}/data-pipeline | tr '[:upper:]' '[:lower:]')" >> $GITHUB_OUTPUT + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GHCR_PAT || secrets.GITHUB_TOKEN }} + + - name: Sanitize branch name for Docker tag + id: tag + run: echo "name=$(echo ${{ github.ref_name }} | sed 's/\//-/g')" >> $GITHUB_OUTPUT + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: docker/Dockerfile + platforms: linux/amd64 + push: true + tags: ${{ env.REGISTRY }}/${{ steps.image.outputs.name }}:${{ steps.tag.outputs.name }} + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Image summary + run: | + echo "### Docker Image Built 🐳" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "**Image:** ${{ env.REGISTRY }}/${{ steps.image.outputs.name }}:${{ steps.tag.outputs.name }}" >> $GITHUB_STEP_SUMMARY diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 07fca16..d27b937 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,6 +7,10 @@ on: branches: [ main, feat/performance-validation ] workflow_dispatch: +permissions: + contents: read + packages: write + jobs: test: runs-on: ubuntu-latest @@ -52,3 +56,29 @@ jobs: with: files: ./coverage.xml fail_ci_if_error: false + + integration-tests: + runs-on: ubuntu-latest + if: github.event_name == 'pull_request' + strategy: + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install uv + uses: astral-sh/setup-uv@v3 + with: + version: "latest" + + - name: Install dependencies + run: uv sync --all-extras + + - name: Run integration tests + run: uv run pytest tests/integration/ -v --tb=short diff --git a/README.md b/README.md index 90f8d23..576cf8a 100644 --- a/README.md +++ b/README.md @@ -1,123 +1,122 @@ # EOPF GeoZarr Data Pipeline -Automated pipeline for converting Sentinel Zarr datasets to cloud-optimized GeoZarr format with STAC catalog integration and interactive visualization. +Automated Kubernetes pipeline for converting Sentinel Zarr datasets to cloud-optimized GeoZarr format with STAC catalog integration. -## Quick Start (30 seconds) +## Quick Start ```bash -# 1. Submit workflow export KUBECONFIG=.work/kubeconfig kubectl create -f workflows/run-s1-test.yaml -n devseed-staging - -# 2. Monitor -kubectl logs -n devseed-staging -l workflows.argoproj.io/workflow= -c main -f +kubectl get wf -n devseed-staging -w ``` -šŸ“– **New here?** [GETTING_STARTED.md](GETTING_STARTED.md) • **Details:** [Full docs below](#submitting-workflows) +šŸ“– **First time?** See [GETTING_STARTED.md](GETTING_STARTED.md) for full setup +šŸŽÆ **Monitor:** [Argo UI](https://argo-workflows.hub-eopf-explorer.eox.at) ## What It Does -**Input:** STAC item URL → **Output:** Interactive web map in ~15-20 minutes - -``` -Convert (15 min) → Register (30 sec) → Augment (10 sec) -``` - -**Supports:** Sentinel-1 GRD (SAR) • Sentinel-2 L2A (optical) - -**Prerequisites:** Kubernetes with [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) • Python 3.11+ • [GETTING_STARTED.md](GETTING_STARTED.md) for full setup - -## Submitting Workflows +**Input:** STAC item URL → **Output:** Cloud-optimized GeoZarr + Interactive map (~15-20 min) -| Method | Best For | Setup | Status | -|--------|----------|-------|--------| -| šŸŽÆ **kubectl** | Testing, CI/CD | None | āœ… Recommended | -| šŸ““ **Jupyter** | Learning, exploration | 2 min | āœ… Working | -| ⚔ **Event-driven** | Production (auto) | In-cluster | āœ… Running | -| šŸ **Python CLI** | Scripting | Port-forward | āš ļø Advanced | +**Supports:** Sentinel-1 GRD, Sentinel-2 L2A +**Stack:** Argo Workflows • [eopf-geozarr](https://github.com/EOPF-Explorer/data-model) • Dask • RabbitMQ • Prometheus +**Resources:** 6Gi memory, burstable CPU per workflow -
-kubectl (recommended) +## Monitoring ```bash -export KUBECONFIG=.work/kubeconfig -kubectl create -f workflows/run-s1-test.yaml -n devseed-staging -o name -kubectl logs -n devseed-staging -l workflows.argoproj.io/workflow= -c main -f +# Health check +kubectl get wf -n devseed-staging --field-selector status.phase=Running + +# Recent workflows (last hour) +kubectl get wf -n devseed-staging --sort-by=.metadata.creationTimestamp | tail -10 ``` -Edit `workflows/run-s1-test.yaml` with your STAC URL and collection. -
-
-Jupyter +**Web UI:** [Argo Workflows](https://argo-workflows.hub-eopf-explorer.eox.at) +## Usage + +### kubectl (Testing) ```bash -uv sync --extra notebooks -cp notebooks/.env.example notebooks/.env -uv run jupyter lab notebooks/operator.ipynb +kubectl create -f workflows/run-s1-test.yaml -n devseed-staging ``` -
-
-Event-driven (production) +**Namespaces:** `devseed-staging` (testing) • `devseed` (production) +### Event-driven (Production) Publish to RabbitMQ `geozarr` exchange: ```json -{"source_url": "https://stac.../items/S1A_...", "item_id": "S1A_IW_GRDH_...", "collection": "sentinel-1-l1-grd-dp-test"} +{"source_url": "https://stac.../items/...", "item_id": "...", "collection": "..."} ``` -
- -
-Python CLI +### Jupyter Notebooks ```bash -kubectl port-forward -n core svc/rabbitmq 5672:5672 -export AMQP_PASSWORD=$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d) -uv run python examples/submit.py --stac-url "..." --collection sentinel-2-l2a +uv sync --extra notebooks +cp notebooks/.env.example notebooks/.env +uv run jupyter lab notebooks/ ``` -
-**Related:** [data-model](https://github.com/EOPF-Explorer/data-model) • [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) • [Testing report](docs/WORKFLOW_SUBMISSION_TESTING.md) +See [examples/](examples/) for more patterns. ## Configuration -
-S3 & RabbitMQ - ```bash -# S3 credentials +# S3 credentials (OVH S3) kubectl create secret generic geozarr-s3-credentials -n devseed \ - --from-literal=AWS_ACCESS_KEY_ID="" \ - --from-literal=AWS_SECRET_ACCESS_KEY="" + --from-literal=AWS_ACCESS_KEY_ID="..." \ + --from-literal=AWS_SECRET_ACCESS_KEY="..." \ + --from-literal=AWS_ENDPOINT_URL="https://s3.de.io.cloud.ovh.net" + +# S3 output location +# Bucket: esa-zarr-sentinel-explorer-fra +# Prefix: tests-output (staging) or geozarr (production) -# RabbitMQ password +# Get RabbitMQ password kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d -``` -**Endpoints:** S3: `s3.de.io.cloud.ovh.net/esa-zarr-sentinel-explorer-fra` • RabbitMQ: `geozarr` exchange • [UIs](https://workspace.devseed.hub-eopf-explorer.eox.at/): [Argo](https://argo-workflows.hub-eopf-explorer.eox.at) • [STAC](https://api.explorer.eopf.copernicus.eu/stac) • [Viewer](https://api.explorer.eopf.copernicus.eu/raster) -
+# STAC API endpoints +# STAC API: https://api.explorer.eopf.copernicus.eu/stac +# Raster API: https://api.explorer.eopf.copernicus.eu/raster +``` ## Troubleshooting -
-Logs & Issues - ```bash -kubectl get wf -n devseed-staging -w +# Check workflow status +kubectl get wf -n devseed-staging --sort-by=.metadata.creationTimestamp | tail -5 + +# View logs kubectl logs -n devseed-staging -c main -f -kubectl logs -n devseed -l sensor-name=geozarr-sensor --tail=50 + +# Check resources +kubectl top nodes ``` -**Common fixes:** Workflow not starting → check sensor logs • S3 denied → verify `geozarr-s3-credentials` secret • RabbitMQ refused → `kubectl port-forward -n core svc/rabbitmq 5672:5672` • Pod pending → check resources -
+**Common issues:** +- **Workflow not starting:** Check sensor logs: `kubectl logs -n devseed -l sensor-name=geozarr-sensor` +- **S3 errors:** Verify credentials secret exists +- **Pod pending:** Check node capacity with `kubectl top nodes` + +**Performance:** S1 GRD (10GB): 15-20 min • S2 L2A (5GB): 8-12 min • Increase if >20GB dataset + +See [GETTING_STARTED.md](GETTING_STARTED.md#troubleshooting) for more. ## Development ```bash -uv sync --all-extras && pre-commit install -make test # or: pytest tests/ -v -k e2e +# Setup +uv sync --all-extras +pre-commit install + +# Test +pytest tests/ -v # 100/100 passing + +# Deploy +kubectl apply -f workflows/template.yaml -n devseed ``` -**Deploy:** Edit `workflows/template.yaml` or `scripts/*.py` → `pytest tests/ -v` → `docker buildx build --platform linux/amd64 -t ghcr.io/eopf-explorer/data-pipeline:dev .` → `kubectl apply -f workflows/template.yaml -n devseed` • [CONTRIBUTING.md](CONTRIBUTING.md) +**Project structure:** `workflows/` (manifests) • `scripts/` (Python utils) • `tests/` (pytest) • `notebooks/` (tutorials) + +**Documentation:** [CONTRIBUTING.md](CONTRIBUTING.md) • [GETTING_STARTED.md](GETTING_STARTED.md) ## License diff --git a/docker/Dockerfile b/docker/Dockerfile index 24dd1d4..3532a2e 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,19 +16,20 @@ WORKDIR /app # Install uv for fast dependency resolution RUN pip install -U pip uv -# Cachebust for data-model installation (change timestamp to force fresh install) -ARG CACHEBUST=2025-10-09T00:00:00Z +# Use git commit SHA for precise cache control +# Update via: docker build --build-arg DATA_MODEL_COMMIT=$(git ls-remote https://github.com/EOPF-Explorer/data-model.git refs/heads/fix/s1-encoding-conflict | cut -f1) +ARG DATA_MODEL_COMMIT=fix/s1-encoding-conflict -# Install eopf-geozarr from fix/s1-encoding-conflict branch (includes dask[distributed]) +# Install eopf-geozarr from data-model (includes dask[distributed]) RUN uv pip install --system --no-cache \ - git+https://github.com/EOPF-Explorer/data-model.git@fix/s1-encoding-conflict \ - pystac>=1.10.0 \ - httpx>=0.27.0 \ - boto3>=1.34.0 \ - tenacity>=8.0.0 - -# Force fresh copy of scripts (invalidate cache) -ARG SCRIPTS_VERSION=2025-10-09T00:00:00Z + git+https://github.com/EOPF-Explorer/data-model.git@${DATA_MODEL_COMMIT} + +# Copy project files for dependency installation +COPY pyproject.toml README.md /app/ +RUN uv pip install --system --no-cache /app + +# Copy scripts (cache invalidated by content changes, not manual ARG) +ARG SCRIPTS_VERSION=auto # Copy scripts COPY scripts/ /app/scripts/ diff --git a/docs/prometheus-metrics.md b/docs/prometheus-metrics.md new file mode 100644 index 0000000..f4220df --- /dev/null +++ b/docs/prometheus-metrics.md @@ -0,0 +1,100 @@ +# Prometheus Metrics + +## Metrics Collected + +Pipeline scripts expose Prometheus metrics for observability. Metrics server runs on port 8000 in workflow pods. + +### STAC Registration (`register_stac.py`) +```python +stac_registration_total{collection, operation, status} +# operation: create|update|skip|replace +# status: success|error +# Track failures, operation distribution + +stac_http_request_duration_seconds{operation, endpoint} +# operation: get|put|post|delete +# endpoint: item|items +# STAC API latency, set SLOs +``` + +### Preview Generation (`augment_stac_item.py`) +```python +preview_generation_duration_seconds{collection} +# Augmentation performance by collection + +preview_http_request_duration_seconds{operation, endpoint} +# operation: get|put +# STAC API response times during augmentation +``` + +## Key Queries + +**Success Rate (SLO: >99%)** +```promql +sum(rate(stac_registration_total{status="success"}[5m])) / sum(rate(stac_registration_total[5m])) +``` + +**Errors by Collection** +```promql +sum(rate(stac_registration_total{status="error"}[5m])) by (collection) +``` + +**STAC API Latency P95 (SLO: <500ms)** +```promql +histogram_quantile(0.95, rate(stac_http_request_duration_seconds_bucket[5m])) by (operation) +``` + +**Preview Duration P95 (SLO: <10s)** +```promql +histogram_quantile(0.95, rate(preview_generation_duration_seconds_bucket[5m])) by (collection) +``` + +**Throughput (items/min)** +```promql +sum(rate(stac_registration_total[5m])) * 60 +``` + +## Setup + +Prometheus scrapes via PodMonitor (deployed in `platform-deploy/workspaces/devseed*/data-pipeline/`). + +**Verify:** +```bash +kubectl port-forward -n core svc/prometheus-operated 9090:9090 +# http://localhost:9090/targets → "geozarr-workflows" +``` + +## Grafana Dashboards + +- **Overview**: Success rate, throughput, error rate by collection +- **Performance**: P95 latencies (STAC API, preview generation) +- **Capacity**: Peak load, processing rate trends + +## Alerts + +**High Failure Rate** +```yaml +expr: rate(stac_registration_total{status="error"}[5m]) / rate(stac_registration_total[5m]) > 0.1 +for: 5m +# Check STAC API status, verify auth tokens +``` + +**Slow Preview Generation** +```yaml +expr: histogram_quantile(0.95, rate(preview_generation_duration_seconds_bucket[5m])) > 60 +for: 10m +# Check TiTiler API or asset access +``` + +**STAC API Latency** +```yaml +expr: histogram_quantile(0.95, rate(stac_http_request_duration_seconds_bucket[5m])) > 1 +for: 10m +# Database overload or network issues +``` + +## SLOs + +- **Success Rate**: >99% +- **STAC API P95**: <500ms +- **Preview P95**: <10s diff --git a/notebooks/02_pyramid_performance.ipynb b/notebooks/02_pyramid_performance.ipynb index 8ce3716..4ae268a 100644 --- a/notebooks/02_pyramid_performance.ipynb +++ b/notebooks/02_pyramid_performance.ipynb @@ -399,7 +399,7 @@ "plt.show()\n", "\n", "print(\n", - " f\"\\nšŸ“Š Key Metric: {np.mean([s for z, s in zip(zooms, [measured[i]/expected[i] for i in range(len(zooms))], strict=False) if z <= 10]):.1f}Ɨ average speedup at production-relevant zooms\"\n", + " f\"\\nšŸ“Š Key Metric: {np.mean([s for z, s in zip(zooms, [measured[i] / expected[i] for i in range(len(zooms))], strict=False) if z <= 10]):.1f}Ɨ average speedup at production-relevant zooms\"\n", ")" ] }, @@ -426,15 +426,15 @@ "print(\"Return on Investment:\")\n", "print(\"=\" * 60)\n", "print(\"Storage Cost:\")\n", - "print(f\" Native only: {native_storage:,} pixels ({native_storage/1e6:.0f} MB uncompressed)\")\n", - "print(f\" With pyramids: {total_storage:,} pixels ({total_storage/1e6:.0f} MB uncompressed)\")\n", + "print(f\" Native only: {native_storage:,} pixels ({native_storage / 1e6:.0f} MB uncompressed)\")\n", + "print(f\" With pyramids: {total_storage:,} pixels ({total_storage / 1e6:.0f} MB uncompressed)\")\n", "print(f\" Overhead: +{overhead_pct:.0f}%\")\n", "print(\"\\nPerformance Gain:\")\n", "print(\n", - " f\" z6-10 (low zoom): {np.mean([measured[i]/expected[i] for i, z in enumerate(zooms) if z <= 10]):.1f}Ɨ faster\"\n", + " f\" z6-10 (low zoom): {np.mean([measured[i] / expected[i] for i, z in enumerate(zooms) if z <= 10]):.1f}Ɨ faster\"\n", ")\n", "print(\n", - " f\" z12-14 (high zoom): {np.mean([measured[i]/expected[i] for i, z in enumerate(zooms) if z >= 12]):.1f}Ɨ faster\"\n", + " f\" z12-14 (high zoom): {np.mean([measured[i] / expected[i] for i, z in enumerate(zooms) if z >= 12]):.1f}Ɨ faster\"\n", ")\n", "print(\"\\nProduction Impact:\")\n", "print(\" • Consistent 100-200ms tile generation across all zooms\")\n", diff --git a/pyproject.toml b/pyproject.toml index b62632b..49a2d79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "pika>=1.3.0", "tenacity>=8.0.0", "requests>=2.31.0", + "prometheus-client>=0.19.0", ] [project.optional-dependencies] @@ -55,6 +56,7 @@ packages = ["scripts"] [tool.pytest.ini_options] minversion = "8.0" testpaths = ["tests"] +pythonpath = ["scripts"] # Fix import resolution for tests python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] diff --git a/scripts/augment_stac_item.py b/scripts/augment_stac_item.py index 8d28d09..578dc6d 100644 --- a/scripts/augment_stac_item.py +++ b/scripts/augment_stac_item.py @@ -13,6 +13,7 @@ import httpx import s3fs import zarr +from metrics import PREVIEW_GENERATION_DURATION, PREVIEW_HTTP_REQUEST_DURATION from pystac import Asset, Item, Link from pystac.extensions.projection import ProjectionExtension @@ -1049,21 +1050,23 @@ def _request( def http_get(url: str, headers: dict[str, str]) -> dict[str, Any]: - data = _request("GET", url, headers).json() + with PREVIEW_HTTP_REQUEST_DURATION.labels(operation="get", endpoint="item").time(): + data = _request("GET", url, headers).json() if isinstance(data, dict): return data raise ValueError("unexpected non-mapping response body") def http_put(url: str, data: dict[str, Any], headers: dict[str, str]) -> int: - return int( - _request( - "PUT", - url, - {**headers, "Content-Type": "application/json"}, - json_body=data, - ).status_code - ) + with PREVIEW_HTTP_REQUEST_DURATION.labels(operation="put", endpoint="item").time(): + return int( + _request( + "PUT", + url, + {**headers, "Content-Type": "application/json"}, + json_body=data, + ).status_code + ) def ensure_collection_thumbnail( @@ -1143,12 +1146,14 @@ def main(argv: Sequence[str] | None = None) -> int: item = Item.from_dict(payload) target_collection = item.collection_id or args.collection - _augment_item( - item, - raster_base=args.raster_base, - collection_id=target_collection, - verbose=args.verbose, - ) + + with PREVIEW_GENERATION_DURATION.labels(collection=target_collection).time(): + _augment_item( + item, + raster_base=args.raster_base, + collection_id=target_collection, + verbose=args.verbose, + ) target_url = f"{args.stac.rstrip('/')}/collections/{target_collection}/items/{item.id}" try: diff --git a/scripts/benchmark_geozarr.py b/scripts/benchmark_geozarr.py index c3b9cdf..7b60e1b 100644 --- a/scripts/benchmark_geozarr.py +++ b/scripts/benchmark_geozarr.py @@ -110,7 +110,7 @@ def main(argv: list[str] | None = None) -> int: if speedup > 1: logger.info(f"āœ… GeoZarr is {speedup}x faster than EOPF") else: - logger.warning(f"āš ļø EOPF is {1/speedup:.2f}x faster than GeoZarr") + logger.warning(f"āš ļø EOPF is {1 / speedup:.2f}x faster than GeoZarr") return 0 diff --git a/scripts/benchmark_tile_performance.py b/scripts/benchmark_tile_performance.py index 9f2c205..8743539 100644 --- a/scripts/benchmark_tile_performance.py +++ b/scripts/benchmark_tile_performance.py @@ -154,7 +154,7 @@ def benchmark_zoom_level( status = "āœ“" if result["success"] else "āœ—" logger.debug( f" {status} z{z}/{x}/{y}: {result['latency_ms']:.1f}ms " - f"({result['size_bytes']/1024:.1f}KB)" + f"({result['size_bytes'] / 1024:.1f}KB)" ) # Calculate statistics diff --git a/scripts/get_conversion_params.py b/scripts/get_conversion_params.py index da5bb42..9b38867 100644 --- a/scripts/get_conversion_params.py +++ b/scripts/get_conversion_params.py @@ -5,16 +5,24 @@ different satellite collections, enabling the workflow template to use data-driven configuration instead of hard-coded bash conditionals. +Environment Variable Overrides (for testing/debugging): + OVERRIDE_GROUPS: Override groups parameter + OVERRIDE_EXTRA_FLAGS: Override extra_flags parameter + OVERRIDE_SPATIAL_CHUNK: Override spatial_chunk parameter + OVERRIDE_TILE_WIDTH: Override tile_width parameter + Usage: python3 get_conversion_params.py --collection sentinel-1-l1-grd python3 get_conversion_params.py --collection sentinel-2-l2a --format json python3 get_conversion_params.py --collection sentinel-2-l2a --param groups + OVERRIDE_GROUPS="/custom/path" python3 get_conversion_params.py --collection sentinel-2-l2a """ from __future__ import annotations import argparse import json +import os import sys from typing import Any, cast @@ -58,6 +66,12 @@ def _match_collection_config(collection_id: str) -> dict[str, Any] | None: def get_conversion_params(collection_id: str) -> dict[str, Any]: """Get conversion parameters for collection. + Environment variables can override configuration values: + - OVERRIDE_GROUPS: Override groups parameter + - OVERRIDE_EXTRA_FLAGS: Override extra_flags parameter + - OVERRIDE_SPATIAL_CHUNK: Override spatial_chunk parameter (integer) + - OVERRIDE_TILE_WIDTH: Override tile_width parameter (integer) + Args: collection_id: Collection identifier (e.g., sentinel-1-l1-grd-dp-test) @@ -75,7 +89,19 @@ def get_conversion_params(collection_id: str) -> dict[str, Any]: raise ValueError(f"No config for collection {collection_id}") config = default_config - return cast(dict[str, Any], config.get("conversion", {})) + conversion_params = cast(dict[str, Any], config.get("conversion", {})) + + # Apply environment variable overrides (useful for testing/debugging) + return { + "groups": os.getenv("OVERRIDE_GROUPS", conversion_params.get("groups", "")), + "extra_flags": os.getenv("OVERRIDE_EXTRA_FLAGS", conversion_params.get("extra_flags", "")), + "spatial_chunk": int( + os.getenv("OVERRIDE_SPATIAL_CHUNK", str(conversion_params.get("spatial_chunk", 4096))) + ), + "tile_width": int( + os.getenv("OVERRIDE_TILE_WIDTH", str(conversion_params.get("tile_width", 512))) + ), + } def main(argv: list[str] | None = None) -> int: @@ -105,8 +131,9 @@ def main(argv: list[str] | None = None) -> int: try: params = get_conversion_params(args.collection) except ValueError as exc: + # Use print for CLI output, not logging print(f"Error: {exc}", file=sys.stderr) - return 1 + sys.exit(1) if args.param: # Output single parameter (for shell variable assignment) diff --git a/scripts/metrics.py b/scripts/metrics.py new file mode 100644 index 0000000..e030e58 --- /dev/null +++ b/scripts/metrics.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +"""Prometheus metrics instrumentation for data-pipeline scripts. + +This module provides shared metric definitions and a metrics server +for exposing metrics to the Prometheus scraper in Kubernetes. + +Usage: + from scripts.metrics import start_metrics_server, CONVERSION_DURATION + + start_metrics_server(port=8000) # In main() + + with CONVERSION_DURATION.labels(collection="sentinel-2-l2a").time(): + convert_data() +""" + +from __future__ import annotations + +import logging +import os + +from prometheus_client import Counter, Histogram, start_http_server + +logger = logging.getLogger(__name__) + +# Metrics port for Kubernetes ServiceMonitor to scrape +DEFAULT_METRICS_PORT = 8000 + +# Conversion workflow metrics +CONVERSION_DURATION = Histogram( + "geozarr_conversion_seconds", + "Time to convert source to GeoZarr format", + labelnames=["collection", "resolution"], +) + +CONVERSION_DATA_SIZE = Histogram( + "geozarr_conversion_bytes", + "Size of data converted in bytes", + labelnames=["collection"], + buckets=[1e6, 10e6, 100e6, 1e9, 10e9, 100e9], # 1MB to 100GB +) + +# STAC API interaction metrics +STAC_REGISTRATION_TOTAL = Counter( + "stac_registration_total", + "Total STAC item registration attempts", + labelnames=["collection", "status"], # status: success|failure|retry +) + +STAC_HTTP_REQUEST_DURATION = Histogram( + "stac_http_request_seconds", + "STAC API HTTP request duration", + labelnames=["method", "endpoint", "status_code"], +) + +# Preview generation metrics +PREVIEW_GENERATION_DURATION = Histogram( + "preview_generation_seconds", + "Time to generate preview images", + labelnames=["collection", "preview_type"], # preview_type: true_color|quicklook|s1_grd +) + +PREVIEW_HTTP_REQUEST_DURATION = Histogram( + "preview_http_request_seconds", + "HTTP request duration for preview-related operations", + labelnames=["operation", "status_code"], +) + +# AMQP workflow metrics +AMQP_PUBLISH_TOTAL = Counter( + "amqp_publish_total", + "Total AMQP messages published", + labelnames=["exchange", "status"], # status: success|failure +) + + +def start_metrics_server(port: int | None = None) -> None: + """Start Prometheus metrics HTTP server. + + Args: + port: Port to listen on. Defaults to METRICS_PORT env var or 8000. + + Note: + Should only be called once per process. Safe to call in Kubernetes + pod startup. Metrics exposed at http://localhost:/metrics + """ + if port is None: + port = int(os.getenv("METRICS_PORT", str(DEFAULT_METRICS_PORT))) + + try: + start_http_server(port) + logger.info("Metrics server started on port %d", port) + except OSError as e: + # Port already in use (e.g., from previous run) + logger.warning("Failed to start metrics server on port %d: %s", port, e) + + +def is_metrics_enabled() -> bool: + """Check if metrics collection is enabled. + + Returns: + True if ENABLE_METRICS env var is set to "true" (case-insensitive). + Defaults to True if not set (opt-out model). + """ + return os.getenv("ENABLE_METRICS", "true").lower() == "true" diff --git a/scripts/publish_amqp.py b/scripts/publish_amqp.py index f3c5328..1cb5239 100644 --- a/scripts/publish_amqp.py +++ b/scripts/publish_amqp.py @@ -27,10 +27,10 @@ def load_payload(payload_file: Path) -> dict[str, Any]: data: dict[str, Any] = json.loads(payload_file.read_text()) return data except FileNotFoundError: - logger.error("Payload file not found: %s", payload_file) + logger.exception("Payload file not found", extra={"file": str(payload_file)}) sys.exit(1) - except json.JSONDecodeError as e: - logger.error("Invalid JSON in payload file: %s", e) + except json.JSONDecodeError: + logger.exception("Invalid JSON in payload file", extra={"file": str(payload_file)}) sys.exit(1) @@ -41,8 +41,11 @@ def format_routing_key(template: str, payload: dict[str, Any]) -> str: """ try: return template.format(**payload) - except KeyError as e: - logger.error("Missing field %s in payload for routing key template", e) + except KeyError: + logger.exception( + "Missing required field in payload for routing key template", + extra={"template": template, "available_fields": list(payload.keys())}, + ) sys.exit(1) @@ -124,8 +127,15 @@ def main() -> None: payload=payload, virtual_host=args.virtual_host, ) - except Exception as e: - logger.error("Failed to publish message: %s", e) + except Exception: + logger.exception( + "Failed to publish AMQP message", + extra={ + "exchange": args.exchange, + "routing_key": routing_key, + "host": args.host, + }, + ) sys.exit(1) diff --git a/scripts/register_stac.py b/scripts/register_stac.py index 102f31a..2566020 100644 --- a/scripts/register_stac.py +++ b/scripts/register_stac.py @@ -15,11 +15,13 @@ import logging import os import sys +import time from typing import Any, cast from urllib.parse import urlparse import httpx import xarray as xr +from metrics import STAC_HTTP_REQUEST_DURATION, STAC_REGISTRATION_TOTAL from tenacity import retry, stop_after_attempt, wait_exponential # Config: override via env vars @@ -394,7 +396,8 @@ def register_item( with httpx.Client(timeout=TIMEOUT) as client: # Check if item exists try: - response = client.get(item_url, headers=headers) + with STAC_HTTP_REQUEST_DURATION.labels(operation="get", endpoint="item").time(): + response = client.get(item_url, headers=headers) exists = response.status_code == 200 except httpx.HTTPError: exists = False @@ -404,38 +407,65 @@ def register_item( if mode == "create-or-skip": logger.info("Skipping (mode=create-or-skip)") + STAC_REGISTRATION_TOTAL.labels( + collection=collection_id, operation="skip", status="success" + ).inc() return elif mode in ("upsert", "update"): logger.info("Updating existing item (mode=upsert)") - response = client.put(item_url, json=item, headers=headers) + with STAC_HTTP_REQUEST_DURATION.labels(operation="put", endpoint="item").time(): + response = client.put(item_url, json=item, headers=headers) if response.status_code >= 400: logger.error(f" {response.status_code} {response.reason_phrase}") logger.info(f"Response body: {response.text}") + STAC_REGISTRATION_TOTAL.labels( + collection=collection_id, operation="update", status="error" + ).inc() response.raise_for_status() logger.info(f"Successfully updated item {item_id}") + STAC_REGISTRATION_TOTAL.labels( + collection=collection_id, operation="update", status="success" + ).inc() elif mode in ("force", "replace"): logger.info("Deleting and recreating (mode=replace)") - client.delete(item_url, headers=headers) - response = client.post(items_url, json=item, headers=headers) + with STAC_HTTP_REQUEST_DURATION.labels(operation="delete", endpoint="item").time(): + client.delete(item_url, headers=headers) + with STAC_HTTP_REQUEST_DURATION.labels(operation="post", endpoint="items").time(): + response = client.post(items_url, json=item, headers=headers) if response.status_code >= 400: logger.error(f" {response.status_code} {response.reason_phrase}") logger.info(f"Response body: {response.text}") + STAC_REGISTRATION_TOTAL.labels( + collection=collection_id, operation="replace", status="error" + ).inc() response.raise_for_status() logger.info(f"Successfully replaced item {item_id}") + STAC_REGISTRATION_TOTAL.labels( + collection=collection_id, operation="replace", status="success" + ).inc() else: raise ValueError(f"Unknown mode: {mode}") else: logger.info(f"Creating new item {item_id}") - response = client.post(items_url, json=item, headers=headers) + with STAC_HTTP_REQUEST_DURATION.labels(operation="post", endpoint="items").time(): + response = client.post(items_url, json=item, headers=headers) if response.status_code >= 400: logger.error(f" {response.status_code} {response.reason_phrase}") logger.info(f"Response body: {response.text}") + STAC_REGISTRATION_TOTAL.labels( + collection=collection_id, operation="create", status="error" + ).inc() response.raise_for_status() logger.info(f"Successfully created item {item_id}") + STAC_REGISTRATION_TOTAL.labels( + collection=collection_id, operation="create", status="success" + ).inc() def main() -> int: """CLI entrypoint.""" + start_time = time.perf_counter() + parser = argparse.ArgumentParser(description="Register GeoZarr output to STAC API") parser.add_argument( "--stac", @@ -510,11 +540,13 @@ def main() -> int: headers=headers, ) - logger.info("Registration complete") + duration = time.perf_counter() - start_time + logger.info(f"Registration complete in {duration:.2f}s") return 0 except Exception as exc: - logger.error(f" {exc}") + duration = time.perf_counter() - start_time + logger.error(f"Registration failed after {duration:.2f}s: {exc}") import traceback traceback.print_exc() diff --git a/tests/unit/test_get_conversion_params.py b/tests/unit/test_get_conversion_params.py index 9b76e81..5c02b57 100644 --- a/tests/unit/test_get_conversion_params.py +++ b/tests/unit/test_get_conversion_params.py @@ -1,6 +1,7 @@ """Tests for get_conversion_params.py - Collection registry logic.""" import json +import os import pytest @@ -160,3 +161,66 @@ def test_unknown_collection_uses_default(self, capsys): captured = capsys.readouterr() # Should fall back to S2 default assert "ZARR_GROUPS='/quality/l2a_quicklook/r10m'" in captured.out + + +class TestEnvironmentVariableOverrides: + """Test environment variable override functionality.""" + + def test_override_groups(self, monkeypatch): + """OVERRIDE_GROUPS overrides default groups.""" + monkeypatch.setenv("OVERRIDE_GROUPS", "/custom/groups") + params = get_conversion_params("sentinel-2-l2a") + assert params["groups"] == "/custom/groups" + assert params["spatial_chunk"] == 4096 # Other params unchanged + + def test_override_extra_flags(self, monkeypatch): + """OVERRIDE_EXTRA_FLAGS overrides default flags.""" + monkeypatch.setenv("OVERRIDE_EXTRA_FLAGS", "--custom-flag") + params = get_conversion_params("sentinel-1-l1-grd") + assert params["extra_flags"] == "--custom-flag" + + def test_override_spatial_chunk(self, monkeypatch): + """OVERRIDE_SPATIAL_CHUNK overrides default chunk size.""" + monkeypatch.setenv("OVERRIDE_SPATIAL_CHUNK", "8192") + params = get_conversion_params("sentinel-2-l2a") + assert params["spatial_chunk"] == 8192 + assert isinstance(params["spatial_chunk"], int) + + def test_override_tile_width(self, monkeypatch): + """OVERRIDE_TILE_WIDTH overrides default tile width.""" + monkeypatch.setenv("OVERRIDE_TILE_WIDTH", "1024") + params = get_conversion_params("sentinel-1-l1-grd") + assert params["tile_width"] == 1024 + assert isinstance(params["tile_width"], int) + + def test_multiple_overrides(self, monkeypatch): + """Multiple overrides work together.""" + monkeypatch.setenv("OVERRIDE_GROUPS", "/test/path") + monkeypatch.setenv("OVERRIDE_SPATIAL_CHUNK", "2048") + params = get_conversion_params("sentinel-2-l2a") + assert params["groups"] == "/test/path" + assert params["spatial_chunk"] == 2048 + # Non-overridden values remain default + assert params["extra_flags"] == "--crs-groups /quality/l2a_quicklook/r10m" + + def test_override_empty_string(self, monkeypatch): + """Empty string override is allowed.""" + monkeypatch.setenv("OVERRIDE_EXTRA_FLAGS", "") + params = get_conversion_params("sentinel-1-l1-grd") + assert params["extra_flags"] == "" + + def test_no_override_uses_default(self): + """Without env vars, uses configuration defaults.""" + # Ensure no env vars are set + for var in [ + "OVERRIDE_GROUPS", + "OVERRIDE_EXTRA_FLAGS", + "OVERRIDE_SPATIAL_CHUNK", + "OVERRIDE_TILE_WIDTH", + ]: + if var in os.environ: + del os.environ[var] + + params = get_conversion_params("sentinel-2-l2a") + assert params["groups"] == "/quality/l2a_quicklook/r10m" + assert params["spatial_chunk"] == 4096 diff --git a/uv.lock b/uv.lock index b82120d..e39ddd2 100644 --- a/uv.lock +++ b/uv.lock @@ -431,6 +431,7 @@ dependencies = [ { name = "click" }, { name = "httpx" }, { name = "pika" }, + { name = "prometheus-client" }, { name = "pystac" }, { name = "requests" }, { name = "s3fs" }, @@ -458,6 +459,7 @@ requires-dist = [ { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.11.0" }, { name = "pika", specifier = ">=1.3.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.7.0" }, + { name = "prometheus-client", specifier = ">=0.19.0" }, { name = "pystac", specifier = ">=1.10.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0.0" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1.0" }, @@ -1090,6 +1092,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5b/a5/987a405322d78a73b66e39e4a90e4ef156fd7141bf71df987e50717c321b/pre_commit-4.3.0-py2.py3-none-any.whl", hash = "sha256:2b0747ad7e6e967169136edffee14c16e148a778a54e4f967921aa1ebf2308d8", size = 220965, upload-time = "2025-08-09T18:56:13.192Z" }, ] +[[package]] +name = "prometheus-client" +version = "0.23.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/23/53/3edb5d68ecf6b38fcbcc1ad28391117d2a322d9a1a3eff04bfdb184d8c3b/prometheus_client-0.23.1.tar.gz", hash = "sha256:6ae8f9081eaaaf153a2e959d2e6c4f4fb57b12ef76c8c7980202f1e57b48b2ce", size = 80481, upload-time = "2025-09-18T20:47:25.043Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b8/db/14bafcb4af2139e046d03fd00dea7873e48eafe18b7d2797e73d6681f210/prometheus_client-0.23.1-py3-none-any.whl", hash = "sha256:dd1913e6e76b59cfe44e7a4b83e01afc9873c1bdfd2ed8739f1e76aeca115f99", size = 61145, upload-time = "2025-09-18T20:47:23.875Z" }, +] + [[package]] name = "propcache" version = "0.4.0" diff --git a/validate-setup.sh b/validate-setup.sh index d97b6a6..bff2eaf 100755 --- a/validate-setup.sh +++ b/validate-setup.sh @@ -4,6 +4,9 @@ set -euo pipefail +# Error trap for better debugging +trap 'echo "āŒ Validation failed at line $LINENO with exit code $?"' ERR + NAMESPACE="${NAMESPACE:-devseed}" PASS=0 FAIL=0 diff --git a/workflows/template.yaml b/workflows/template.yaml index ddd4757..380ec25 100644 --- a/workflows/template.yaml +++ b/workflows/template.yaml @@ -38,7 +38,7 @@ spec: - name: s3_output_prefix value: "tests-output" - name: pipeline_image_version - value: "v26" # v26 includes Dask parallel processing + value: "feat-prometheus-metrics" # Prometheus metrics integration templates: - name: main @@ -77,6 +77,13 @@ spec: image: ghcr.io/eopf-explorer/data-pipeline:{{workflow.parameters.pipeline_image_version}} imagePullPolicy: Always command: [bash, -c] + resources: + requests: + memory: "6Gi" + cpu: "2" + limits: + memory: "10Gi" + cpu: "4" args: - | set -euo pipefail @@ -172,6 +179,13 @@ spec: image: ghcr.io/eopf-explorer/data-pipeline:{{workflow.parameters.pipeline_image_version}} imagePullPolicy: Always command: [bash] + resources: + requests: + memory: "2Gi" + cpu: "1" + limits: + memory: "4Gi" + cpu: "2" source: | set -euo pipefail @@ -218,9 +232,24 @@ spec: image: ghcr.io/eopf-explorer/data-pipeline:{{workflow.parameters.pipeline_image_version}} imagePullPolicy: Always command: [bash] + ports: + - containerPort: 8000 + name: metrics + resources: + requests: + memory: "1Gi" + cpu: "500m" + limits: + memory: "2Gi" + cpu: "1" source: | set -euo pipefail + # Start metrics server in background (for Prometheus scraping) + python -c "from scripts.metrics import start_metrics_server; start_metrics_server()" & + METRICS_PID=$! + trap "kill $METRICS_PID 2>/dev/null || true" EXIT + echo "════════════════════════════════════════════════════════════════════════════" echo " STEP 3/4: STAC REGISTRATION" echo "════════════════════════════════════════════════════════════════════════════" @@ -252,9 +281,21 @@ spec: image: ghcr.io/eopf-explorer/data-pipeline:{{workflow.parameters.pipeline_image_version}} imagePullPolicy: Always command: [bash] + resources: + requests: + memory: "1Gi" + cpu: "500m" + limits: + memory: "2Gi" + cpu: "1" source: | set -euo pipefail + # Start metrics server in background (for Prometheus scraping) + python -c "from scripts.metrics import start_metrics_server; start_metrics_server()" & + METRICS_PID=$! + trap "kill $METRICS_PID 2>/dev/null || true" EXIT + echo "════════════════════════════════════════════════════════════════════════════" echo " STEP 4/4: STAC AUGMENTATION" echo "════════════════════════════════════════════════════════════════════════════"