Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions scripts/get_conversion_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Generate GeoZarr conversion parameters from collection registry.

This script exports conversion parameters (groups, flags, chunks) for
different satellite collections, enabling the workflow template to use
data-driven configuration instead of hard-coded bash conditionals.

Usage:
python3 get_conversion_params.py --collection sentinel-1-l1-grd
python3 get_conversion_params.py --collection sentinel-2-l2a --format json
python3 get_conversion_params.py --collection sentinel-2-l2a --param groups
"""

from __future__ import annotations

import argparse
import json
import sys
from typing import Any, cast

# Import collection configs from augment_stac_item
# In production, this would be a shared module
_COLLECTION_CONFIGS: dict[str, dict[str, Any]] = {
"sentinel-1-l1-grd": {
"pattern": "sentinel-1-l1-grd*",
"conversion": {
"groups": "/measurements",
"extra_flags": "--gcp-group /conditions/gcp",
"spatial_chunk": 2048,
"tile_width": 512,
},
},
"sentinel-2-l2a": {
"pattern": "sentinel-2-l2a*",
"conversion": {
"groups": "/quality/l2a_quicklook/r10m",
"extra_flags": "--crs-groups /quality/l2a_quicklook/r10m",
"spatial_chunk": 4096,
"tile_width": 512,
},
},
}

_DEFAULT_COLLECTION = "sentinel-2-l2a"


def _match_collection_config(collection_id: str) -> dict[str, Any] | None:
"""Match collection ID to configuration using pattern matching."""
for _key, config in _COLLECTION_CONFIGS.items():
# mypy needs help understanding .items() returns dict values
cfg = cast(dict[str, Any], config) # type: ignore[redundant-cast]
pattern = str(cfg.get("pattern", ""))
if collection_id.startswith(pattern.rstrip("*")):
return cfg
return None


def get_conversion_params(collection_id: str) -> dict[str, Any]:
"""Get conversion parameters for collection.

Args:
collection_id: Collection identifier (e.g., sentinel-1-l1-grd-dp-test)

Returns:
Dict of conversion parameters (groups, extra_flags, spatial_chunk, tile_width)

Raises:
ValueError: If collection not found in registry
"""
config = _match_collection_config(collection_id)
if not config:
# Fallback to default - mypy needs help with dict.get() return type
default_config = cast(dict[str, Any] | None, _COLLECTION_CONFIGS.get(_DEFAULT_COLLECTION)) # type: ignore[redundant-cast]
if not default_config:
raise ValueError(f"No config for collection {collection_id}")
config = default_config

return cast(dict[str, Any], config.get("conversion", {}))


def main(argv: list[str] | None = None) -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Get GeoZarr conversion parameters from collection registry"
)
parser.add_argument(
"--collection",
required=True,
help="Collection ID (e.g., sentinel-1-l1-grd, sentinel-2-l2a-dp-test)",
)
parser.add_argument(
"--format",
choices=["shell", "json"],
default="shell",
help="Output format (shell vars or JSON)",
)
parser.add_argument(
"--param",
choices=["groups", "extra_flags", "spatial_chunk", "tile_width"],
help="Get single parameter (for shell scripts)",
)

args = parser.parse_args(argv)

try:
params = get_conversion_params(args.collection)
except ValueError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1

if args.param:
# Output single parameter (for shell variable assignment)
value = params.get(args.param, "")
print(value)
elif args.format == "json":
# Output JSON (for parsing with jq)
print(json.dumps(params, indent=2))
else:
# Output shell variables (for eval/source)
print(f"ZARR_GROUPS='{params.get('groups', '')}'")
print(f"EXTRA_FLAGS='{params.get('extra_flags', '')}'")
print(f"CHUNK={params.get('spatial_chunk', 4096)}")
print(f"TILE_WIDTH={params.get('tile_width', 512)}")

return 0


if __name__ == "__main__":
sys.exit(main())
128 changes: 107 additions & 21 deletions workflows/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@ spec:
tasks:
- name: convert
template: convert-geozarr
- name: validate
template: validate
dependencies: [convert]
- name: register
template: register-stac
dependencies: [convert]
dependencies: [validate]
- name: augment
template: augment-stac
dependencies: [register]

- name: convert-geozarr
activeDeadlineSeconds: 3600 # 1 hour timeout
script:
# Use data-pipeline image with scripts and latest eopf-geozarr
image: ghcr.io/eopf-explorer/data-pipeline:v21
# Use data-pipeline image with scripts and latest eopf-geozarr (v26 includes Dask)
image: ghcr.io/eopf-explorer/data-pipeline:v26
imagePullPolicy: Always
command: [bash]
source: |
Expand All @@ -63,29 +66,31 @@ spec:
echo "Destination: $OUTPUT_PATH"
echo "Collection: $COLLECTION"

# Clean up any partial output from previous failed runs
echo "🧹 Cleaning up any existing output..."
python3 /app/scripts/cleanup_s3_path.py "$OUTPUT_PATH"

# S1 requires different parameters (both prod and test collections)
if [[ "$COLLECTION" == sentinel-1-l1-grd* ]]; then
ZARR_GROUPS="/measurements"
EXTRA_FLAGS="--gcp-group /conditions/gcp"
CHUNK=2048
echo "📡 S1 GRD mode: groups=$ZARR_GROUPS, chunk=$CHUNK"
# Clean up any partial output from previous failed runs (optional)
if [ -f /app/scripts/cleanup_s3_path.py ]; then
echo "🧹 Cleaning up any existing output..."
python3 /app/scripts/cleanup_s3_path.py "$OUTPUT_PATH" || echo "⚠️ Cleanup failed, continuing anyway"
else
ZARR_GROUPS="/quality/l2a_quicklook/r10m"
EXTRA_FLAGS="--crs-groups /quality/l2a_quicklook/r10m"
CHUNK=4096
echo "🗺️ S2 L2A mode: groups=$ZARR_GROUPS, chunk=$CHUNK"
echo "ℹ️ Skipping cleanup (script not available)"
fi

# Build conversion command
# Get collection-specific conversion parameters from registry
echo "📋 Getting conversion parameters for $COLLECTION..."
eval $(python3 /app/scripts/get_conversion_params.py --collection "$COLLECTION")

echo "📡 Conversion mode:"
echo " Groups: $ZARR_GROUPS"
echo " Chunk: $CHUNK"
echo " Tile width: $TILE_WIDTH"
echo " Extra flags: $EXTRA_FLAGS"

# Build conversion command with Dask for parallel processing
eopf-geozarr convert "$ZARR_URL" "$OUTPUT_PATH" \
--groups "$ZARR_GROUPS" \
$EXTRA_FLAGS \
--spatial-chunk $CHUNK \
--tile-width 512 \
--tile-width $TILE_WIDTH \
--dask-cluster \
--verbose
env:
- name: PYTHONUNBUFFERED
Expand All @@ -110,11 +115,46 @@ spec:
memory: "16Gi"
cpu: "4"

- name: validate
activeDeadlineSeconds: 300 # 5 min timeout
container:
image: ghcr.io/eopf-explorer/data-pipeline:v24
imagePullPolicy: Always
command: [python]
args:
- /app/scripts/validate_geozarr.py
- "s3://esa-zarr-sentinel-explorer-fra/tests-output/{{workflow.parameters.register_collection}}/{{workflow.parameters.item_id}}.zarr"
- --item-id
- "{{workflow.parameters.item_id}}"
- --verbose
env:
- name: PYTHONUNBUFFERED
value: "1"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: geozarr-s3-credentials
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: geozarr-s3-credentials
key: AWS_SECRET_ACCESS_KEY
- name: AWS_ENDPOINT_URL
value: "https://s3.de.cloud.ovh.net"
- name: ZARR_V3_EXPERIMENTAL_API
value: "1"
resources:
requests:
memory: "2Gi"
limits:
memory: "4Gi"

- name: register-stac
activeDeadlineSeconds: 300 # 5 min timeout
container:
# Use data-pipeline image for Python scripts (register, augment)
image: ghcr.io/eopf-explorer/data-pipeline:v21
image: ghcr.io/eopf-explorer/data-pipeline:v23
imagePullPolicy: Always
command: [python]
args:
Expand All @@ -137,11 +177,57 @@ spec:
- name: PYTHONUNBUFFERED
value: "1"

- name: benchmark
activeDeadlineSeconds: 600 # 10 min timeout
container:
image: ghcr.io/eopf-explorer/data-pipeline:v23
imagePullPolicy: Always
command: [python]
args:
- /app/scripts/benchmark_comparison.py
- --geozarr-url
- "s3://esa-zarr-sentinel-explorer-fra/tests-output/{{workflow.parameters.register_collection}}/{{workflow.parameters.item_id}}.zarr"
- --eopf-url
- "{{workflow.parameters.source_url}}"
# - --groups # Removed: zarr_groups not available as workflow parameter
# - "{{workflow.parameters.zarr_groups}}"
- --item-id
- "{{workflow.parameters.item_id}}"
- --s3-bucket
- "esa-zarr-sentinel-explorer-fra"
- --windows
- "5"
- --verbose
env:
- name: PYTHONUNBUFFERED
value: "1"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: geozarr-s3-credentials
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: geozarr-s3-credentials
key: AWS_SECRET_ACCESS_KEY
- name: AWS_ENDPOINT_URL
value: "https://s3.de.io.cloud.ovh.net"
- name: ZARR_V3_EXPERIMENTAL_API
value: "1"
resources:
requests:
memory: "4Gi"
cpu: "1"
limits:
memory: "8Gi"
cpu: "2"

- name: augment-stac
activeDeadlineSeconds: 300 # 5 min timeout
container:
# Use data-pipeline image for Python scripts (register, augment)
image: ghcr.io/eopf-explorer/data-pipeline:v21
image: ghcr.io/eopf-explorer/data-pipeline:v23
imagePullPolicy: Always
command: [python]
args:
Expand Down