Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,29 @@ jobs:
with:
files: ./coverage.xml
fail_ci_if_error: false

integration-tests:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
strategy:
matrix:
python-version: ["3.11"]

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"

- name: Install dependencies
run: uv sync --all-extras

- name: Run integration tests
run: uv run pytest tests/integration/ -v --tb=short
137 changes: 68 additions & 69 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,123 +1,122 @@
# EOPF GeoZarr Data Pipeline

Automated pipeline for converting Sentinel Zarr datasets to cloud-optimized GeoZarr format with STAC catalog integration and interactive visualization.
Automated Kubernetes pipeline for converting Sentinel Zarr datasets to cloud-optimized GeoZarr format with STAC catalog integration.

## Quick Start (30 seconds)
## Quick Start

```bash
# 1. Submit workflow
export KUBECONFIG=.work/kubeconfig
kubectl create -f workflows/run-s1-test.yaml -n devseed-staging

# 2. Monitor
kubectl logs -n devseed-staging -l workflows.argoproj.io/workflow=<name> -c main -f
kubectl get wf -n devseed-staging -w
```

📖 **New here?** [GETTING_STARTED.md](GETTING_STARTED.md) • **Details:** [Full docs below](#submitting-workflows)
📖 **First time?** See [GETTING_STARTED.md](GETTING_STARTED.md) for full setup
🎯 **Monitor:** [Argo UI](https://argo-workflows.hub-eopf-explorer.eox.at)

## What It Does

**Input:** STAC item URL → **Output:** Interactive web map in ~15-20 minutes

```
Convert (15 min) → Register (30 sec) → Augment (10 sec)
```

**Supports:** Sentinel-1 GRD (SAR) • Sentinel-2 L2A (optical)

**Prerequisites:** Kubernetes with [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) • Python 3.11+ • [GETTING_STARTED.md](GETTING_STARTED.md) for full setup

## Submitting Workflows
**Input:** STAC item URL → **Output:** Cloud-optimized GeoZarr + Interactive map (~15-20 min)

| Method | Best For | Setup | Status |
|--------|----------|-------|--------|
| 🎯 **kubectl** | Testing, CI/CD | None | ✅ Recommended |
| 📓 **Jupyter** | Learning, exploration | 2 min | ✅ Working |
| ⚡ **Event-driven** | Production (auto) | In-cluster | ✅ Running |
| 🐍 **Python CLI** | Scripting | Port-forward | ⚠️ Advanced |
**Supports:** Sentinel-1 GRD, Sentinel-2 L2A
**Stack:** Argo Workflows • [eopf-geozarr](https://github.com/EOPF-Explorer/data-model) • Dask • RabbitMQ • Prometheus
**Resources:** 6Gi memory, burstable CPU per workflow

<details>
<summary><b>kubectl</b> (recommended)</summary>
## Monitoring

```bash
export KUBECONFIG=.work/kubeconfig
kubectl create -f workflows/run-s1-test.yaml -n devseed-staging -o name
kubectl logs -n devseed-staging -l workflows.argoproj.io/workflow=<wf-name> -c main -f
# Health check
kubectl get wf -n devseed-staging --field-selector status.phase=Running

# Recent workflows (last hour)
kubectl get wf -n devseed-staging --sort-by=.metadata.creationTimestamp | tail -10
```
Edit `workflows/run-s1-test.yaml` with your STAC URL and collection.
</details>

<details>
<summary><b>Jupyter</b></summary>
**Web UI:** [Argo Workflows](https://argo-workflows.hub-eopf-explorer.eox.at)

## Usage

### kubectl (Testing)
```bash
uv sync --extra notebooks
cp notebooks/.env.example notebooks/.env
uv run jupyter lab notebooks/operator.ipynb
kubectl create -f workflows/run-s1-test.yaml -n devseed-staging
```
</details>

<details>
<summary><b>Event-driven</b> (production)</summary>
**Namespaces:** `devseed-staging` (testing) • `devseed` (production)

### Event-driven (Production)
Publish to RabbitMQ `geozarr` exchange:
```json
{"source_url": "https://stac.../items/S1A_...", "item_id": "S1A_IW_GRDH_...", "collection": "sentinel-1-l1-grd-dp-test"}
{"source_url": "https://stac.../items/...", "item_id": "...", "collection": "..."}
```
</details>

<details>
<summary><b>Python CLI</b></summary>

### Jupyter Notebooks
```bash
kubectl port-forward -n core svc/rabbitmq 5672:5672
export AMQP_PASSWORD=$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d)
uv run python examples/submit.py --stac-url "..." --collection sentinel-2-l2a
uv sync --extra notebooks
cp notebooks/.env.example notebooks/.env
uv run jupyter lab notebooks/
```
</details>

**Related:** [data-model](https://github.com/EOPF-Explorer/data-model) • [platform-deploy](https://github.com/EOPF-Explorer/platform-deploy) • [Testing report](docs/WORKFLOW_SUBMISSION_TESTING.md)
See [examples/](examples/) for more patterns.

## Configuration

<details>
<summary><b>S3 & RabbitMQ</b></summary>

```bash
# S3 credentials
# S3 credentials (OVH S3)
kubectl create secret generic geozarr-s3-credentials -n devseed \
--from-literal=AWS_ACCESS_KEY_ID="<key>" \
--from-literal=AWS_SECRET_ACCESS_KEY="<secret>"
--from-literal=AWS_ACCESS_KEY_ID="..." \
--from-literal=AWS_SECRET_ACCESS_KEY="..." \
--from-literal=AWS_ENDPOINT_URL="https://s3.de.io.cloud.ovh.net"

# S3 output location
# Bucket: esa-zarr-sentinel-explorer-fra
# Prefix: tests-output (staging) or geozarr (production)

# RabbitMQ password
# Get RabbitMQ password
kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d
```

**Endpoints:** S3: `s3.de.io.cloud.ovh.net/esa-zarr-sentinel-explorer-fra` • RabbitMQ: `geozarr` exchange • [UIs](https://workspace.devseed.hub-eopf-explorer.eox.at/): [Argo](https://argo-workflows.hub-eopf-explorer.eox.at) • [STAC](https://api.explorer.eopf.copernicus.eu/stac) • [Viewer](https://api.explorer.eopf.copernicus.eu/raster)
</details>
# STAC API endpoints
# STAC API: https://api.explorer.eopf.copernicus.eu/stac
# Raster API: https://api.explorer.eopf.copernicus.eu/raster
```

## Troubleshooting

<details>
<summary><b>Logs & Issues</b></summary>

```bash
kubectl get wf -n devseed-staging -w
# Check workflow status
kubectl get wf -n devseed-staging --sort-by=.metadata.creationTimestamp | tail -5

# View logs
kubectl logs -n devseed-staging <pod-name> -c main -f
kubectl logs -n devseed -l sensor-name=geozarr-sensor --tail=50

# Check resources
kubectl top nodes
```

**Common fixes:** Workflow not starting → check sensor logs • S3 denied → verify `geozarr-s3-credentials` secret • RabbitMQ refused → `kubectl port-forward -n core svc/rabbitmq 5672:5672` • Pod pending → check resources
</details>
**Common issues:**
- **Workflow not starting:** Check sensor logs: `kubectl logs -n devseed -l sensor-name=geozarr-sensor`
- **S3 errors:** Verify credentials secret exists
- **Pod pending:** Check node capacity with `kubectl top nodes`

**Performance:** S1 GRD (10GB): 15-20 min • S2 L2A (5GB): 8-12 min • Increase if >20GB dataset

See [GETTING_STARTED.md](GETTING_STARTED.md#troubleshooting) for more.

## Development

```bash
uv sync --all-extras && pre-commit install
make test # or: pytest tests/ -v -k e2e
# Setup
uv sync --all-extras
pre-commit install

# Test
pytest tests/ -v # 100/100 passing

# Deploy
kubectl apply -f workflows/template.yaml -n devseed
```

**Deploy:** Edit `workflows/template.yaml` or `scripts/*.py` → `pytest tests/ -v` → `docker buildx build --platform linux/amd64 -t ghcr.io/eopf-explorer/data-pipeline:dev .` → `kubectl apply -f workflows/template.yaml -n devseed` • [CONTRIBUTING.md](CONTRIBUTING.md)
**Project structure:** `workflows/` (manifests) • `scripts/` (Python utils) • `tests/` (pytest) • `notebooks/` (tutorials)

**Documentation:** [CONTRIBUTING.md](CONTRIBUTING.md) • [GETTING_STARTED.md](GETTING_STARTED.md)

## License

Expand Down
23 changes: 12 additions & 11 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@ WORKDIR /app
# Install uv for fast dependency resolution
RUN pip install -U pip uv

# Cachebust for data-model installation (change timestamp to force fresh install)
ARG CACHEBUST=2025-10-09T00:00:00Z
# Use git commit SHA for precise cache control
# Update via: docker build --build-arg DATA_MODEL_COMMIT=$(git ls-remote https://github.com/EOPF-Explorer/data-model.git refs/heads/fix/s1-encoding-conflict | cut -f1)
ARG DATA_MODEL_COMMIT=fix/s1-encoding-conflict

# Install eopf-geozarr from fix/s1-encoding-conflict branch (includes dask[distributed])
# Install eopf-geozarr from data-model (includes dask[distributed])
RUN uv pip install --system --no-cache \
git+https://github.com/EOPF-Explorer/data-model.git@fix/s1-encoding-conflict \
pystac>=1.10.0 \
httpx>=0.27.0 \
boto3>=1.34.0 \
tenacity>=8.0.0

# Force fresh copy of scripts (invalidate cache)
ARG SCRIPTS_VERSION=2025-10-09T00:00:00Z
git+https://github.com/EOPF-Explorer/data-model.git@${DATA_MODEL_COMMIT}

# Copy project files for dependency installation
COPY pyproject.toml README.md /app/
RUN uv pip install --system --no-cache /app

# Copy scripts (cache invalidated by content changes, not manual ARG)
ARG SCRIPTS_VERSION=auto

# Copy scripts
COPY scripts/ /app/scripts/
Expand Down
100 changes: 100 additions & 0 deletions docs/prometheus-metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Prometheus Metrics

## Metrics Collected

Pipeline scripts expose Prometheus metrics for observability. Metrics server runs on port 8000 in workflow pods.

### STAC Registration (`register_stac.py`)
```python
stac_registration_total{collection, operation, status}
# operation: create|update|skip|replace
# status: success|error
# Track failures, operation distribution

stac_http_request_duration_seconds{operation, endpoint}
# operation: get|put|post|delete
# endpoint: item|items
# STAC API latency, set SLOs
```

### Preview Generation (`augment_stac_item.py`)
```python
preview_generation_duration_seconds{collection}
# Augmentation performance by collection

preview_http_request_duration_seconds{operation, endpoint}
# operation: get|put
# STAC API response times during augmentation
```

## Key Queries

**Success Rate (SLO: >99%)**
```promql
sum(rate(stac_registration_total{status="success"}[5m])) / sum(rate(stac_registration_total[5m]))
```

**Errors by Collection**
```promql
sum(rate(stac_registration_total{status="error"}[5m])) by (collection)
```

**STAC API Latency P95 (SLO: <500ms)**
```promql
histogram_quantile(0.95, rate(stac_http_request_duration_seconds_bucket[5m])) by (operation)
```

**Preview Duration P95 (SLO: <10s)**
```promql
histogram_quantile(0.95, rate(preview_generation_duration_seconds_bucket[5m])) by (collection)
```

**Throughput (items/min)**
```promql
sum(rate(stac_registration_total[5m])) * 60
```

## Setup

Prometheus scrapes via PodMonitor (deployed in `platform-deploy/workspaces/devseed*/data-pipeline/`).

**Verify:**
```bash
kubectl port-forward -n core svc/prometheus-operated 9090:9090
# http://localhost:9090/targets → "geozarr-workflows"
```

## Grafana Dashboards

- **Overview**: Success rate, throughput, error rate by collection
- **Performance**: P95 latencies (STAC API, preview generation)
- **Capacity**: Peak load, processing rate trends

## Alerts

**High Failure Rate**
```yaml
expr: rate(stac_registration_total{status="error"}[5m]) / rate(stac_registration_total[5m]) > 0.1
for: 5m
# Check STAC API status, verify auth tokens
```

**Slow Preview Generation**
```yaml
expr: histogram_quantile(0.95, rate(preview_generation_duration_seconds_bucket[5m])) > 60
for: 10m
# Check TiTiler API or asset access
```

**STAC API Latency**
```yaml
expr: histogram_quantile(0.95, rate(stac_http_request_duration_seconds_bucket[5m])) > 1
for: 10m
# Database overload or network issues
```

## SLOs

- **Success Rate**: >99%
- **STAC API P95**: <500ms
- **Preview P95**: <10s
10 changes: 5 additions & 5 deletions notebooks/02_pyramid_performance.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@
"plt.show()\n",
"\n",
"print(\n",
" f\"\\n📊 Key Metric: {np.mean([s for z, s in zip(zooms, [measured[i]/expected[i] for i in range(len(zooms))], strict=False) if z <= 10]):.1f}× average speedup at production-relevant zooms\"\n",
" f\"\\n📊 Key Metric: {np.mean([s for z, s in zip(zooms, [measured[i] / expected[i] for i in range(len(zooms))], strict=False) if z <= 10]):.1f}× average speedup at production-relevant zooms\"\n",
")"
]
},
Expand All @@ -426,15 +426,15 @@
"print(\"Return on Investment:\")\n",
"print(\"=\" * 60)\n",
"print(\"Storage Cost:\")\n",
"print(f\" Native only: {native_storage:,} pixels ({native_storage/1e6:.0f} MB uncompressed)\")\n",
"print(f\" With pyramids: {total_storage:,} pixels ({total_storage/1e6:.0f} MB uncompressed)\")\n",
"print(f\" Native only: {native_storage:,} pixels ({native_storage / 1e6:.0f} MB uncompressed)\")\n",
"print(f\" With pyramids: {total_storage:,} pixels ({total_storage / 1e6:.0f} MB uncompressed)\")\n",
"print(f\" Overhead: +{overhead_pct:.0f}%\")\n",
"print(\"\\nPerformance Gain:\")\n",
"print(\n",
" f\" z6-10 (low zoom): {np.mean([measured[i]/expected[i] for i, z in enumerate(zooms) if z <= 10]):.1f}× faster\"\n",
" f\" z6-10 (low zoom): {np.mean([measured[i] / expected[i] for i, z in enumerate(zooms) if z <= 10]):.1f}× faster\"\n",
")\n",
"print(\n",
" f\" z12-14 (high zoom): {np.mean([measured[i]/expected[i] for i, z in enumerate(zooms) if z >= 12]):.1f}× faster\"\n",
" f\" z12-14 (high zoom): {np.mean([measured[i] / expected[i] for i, z in enumerate(zooms) if z >= 12]):.1f}× faster\"\n",
")\n",
"print(\"\\nProduction Impact:\")\n",
"print(\" • Consistent 100-200ms tile generation across all zooms\")\n",
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"pika>=1.3.0",
"tenacity>=8.0.0",
"requests>=2.31.0",
"prometheus-client>=0.19.0",
]

[project.optional-dependencies]
Expand All @@ -55,6 +56,7 @@ packages = ["scripts"]
[tool.pytest.ini_options]
minversion = "8.0"
testpaths = ["tests"]
pythonpath = ["scripts"] # Fix import resolution for tests
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
Expand Down
Loading