diff --git a/scripts/get_conversion_params.py b/scripts/get_conversion_params.py new file mode 100644 index 0000000..7be4663 --- /dev/null +++ b/scripts/get_conversion_params.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +"""Generate GeoZarr conversion parameters from collection registry. + +This script exports conversion parameters (groups, flags, chunks) for +different satellite collections, enabling the workflow template to use +data-driven configuration instead of hard-coded bash conditionals. + +Usage: + python3 get_conversion_params.py --collection sentinel-1-l1-grd + python3 get_conversion_params.py --collection sentinel-2-l2a --format json + python3 get_conversion_params.py --collection sentinel-2-l2a --param groups +""" + +from __future__ import annotations + +import argparse +import json +import sys +from typing import Any, cast + +# Import collection configs from augment_stac_item +# In production, this would be a shared module +_COLLECTION_CONFIGS: dict[str, dict[str, Any]] = { + "sentinel-1-l1-grd": { + "pattern": "sentinel-1-l1-grd*", + "conversion": { + "groups": "/measurements", + "extra_flags": "--gcp-group /conditions/gcp", + "spatial_chunk": 2048, + "tile_width": 512, + }, + }, + "sentinel-2-l2a": { + "pattern": "sentinel-2-l2a*", + "conversion": { + "groups": "/quality/l2a_quicklook/r10m", + "extra_flags": "--crs-groups /quality/l2a_quicklook/r10m", + "spatial_chunk": 4096, + "tile_width": 512, + }, + }, +} + +_DEFAULT_COLLECTION = "sentinel-2-l2a" + + +def _match_collection_config(collection_id: str) -> dict[str, Any] | None: + """Match collection ID to configuration using pattern matching.""" + for _key, config in _COLLECTION_CONFIGS.items(): + # mypy needs help understanding .items() returns dict values + cfg = cast(dict[str, Any], config) # type: ignore[redundant-cast] + pattern = str(cfg.get("pattern", "")) + if collection_id.startswith(pattern.rstrip("*")): + return cfg + return None + + +def get_conversion_params(collection_id: str) -> dict[str, Any]: + """Get conversion parameters for collection. + + Args: + collection_id: Collection identifier (e.g., sentinel-1-l1-grd-dp-test) + + Returns: + Dict of conversion parameters (groups, extra_flags, spatial_chunk, tile_width) + + Raises: + ValueError: If collection not found in registry + """ + config = _match_collection_config(collection_id) + if not config: + # Fallback to default - mypy needs help with dict.get() return type + default_config = cast(dict[str, Any] | None, _COLLECTION_CONFIGS.get(_DEFAULT_COLLECTION)) # type: ignore[redundant-cast] + if not default_config: + raise ValueError(f"No config for collection {collection_id}") + config = default_config + + return cast(dict[str, Any], config.get("conversion", {})) + + +def main(argv: list[str] | None = None) -> int: + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Get GeoZarr conversion parameters from collection registry" + ) + parser.add_argument( + "--collection", + required=True, + help="Collection ID (e.g., sentinel-1-l1-grd, sentinel-2-l2a-dp-test)", + ) + parser.add_argument( + "--format", + choices=["shell", "json"], + default="shell", + help="Output format (shell vars or JSON)", + ) + parser.add_argument( + "--param", + choices=["groups", "extra_flags", "spatial_chunk", "tile_width"], + help="Get single parameter (for shell scripts)", + ) + + args = parser.parse_args(argv) + + try: + params = get_conversion_params(args.collection) + except ValueError as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + + if args.param: + # Output single parameter (for shell variable assignment) + value = params.get(args.param, "") + print(value) + elif args.format == "json": + # Output JSON (for parsing with jq) + print(json.dumps(params, indent=2)) + else: + # Output shell variables (for eval/source) + print(f"ZARR_GROUPS='{params.get('groups', '')}'") + print(f"EXTRA_FLAGS='{params.get('extra_flags', '')}'") + print(f"CHUNK={params.get('spatial_chunk', 4096)}") + print(f"TILE_WIDTH={params.get('tile_width', 512)}") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/workflows/template.yaml b/workflows/template.yaml index 3d0df82..a279a06 100644 --- a/workflows/template.yaml +++ b/workflows/template.yaml @@ -26,9 +26,12 @@ spec: tasks: - name: convert template: convert-geozarr + - name: validate + template: validate + dependencies: [convert] - name: register template: register-stac - dependencies: [convert] + dependencies: [validate] - name: augment template: augment-stac dependencies: [register] @@ -36,8 +39,8 @@ spec: - name: convert-geozarr activeDeadlineSeconds: 3600 # 1 hour timeout script: - # Use data-pipeline image with scripts and latest eopf-geozarr - image: ghcr.io/eopf-explorer/data-pipeline:v21 + # Use data-pipeline image with scripts and latest eopf-geozarr (v26 includes Dask) + image: ghcr.io/eopf-explorer/data-pipeline:v26 imagePullPolicy: Always command: [bash] source: | @@ -63,29 +66,31 @@ spec: echo "Destination: $OUTPUT_PATH" echo "Collection: $COLLECTION" - # Clean up any partial output from previous failed runs - echo "🧹 Cleaning up any existing output..." - python3 /app/scripts/cleanup_s3_path.py "$OUTPUT_PATH" - - # S1 requires different parameters (both prod and test collections) - if [[ "$COLLECTION" == sentinel-1-l1-grd* ]]; then - ZARR_GROUPS="/measurements" - EXTRA_FLAGS="--gcp-group /conditions/gcp" - CHUNK=2048 - echo "📡 S1 GRD mode: groups=$ZARR_GROUPS, chunk=$CHUNK" + # Clean up any partial output from previous failed runs (optional) + if [ -f /app/scripts/cleanup_s3_path.py ]; then + echo "🧹 Cleaning up any existing output..." + python3 /app/scripts/cleanup_s3_path.py "$OUTPUT_PATH" || echo "⚠️ Cleanup failed, continuing anyway" else - ZARR_GROUPS="/quality/l2a_quicklook/r10m" - EXTRA_FLAGS="--crs-groups /quality/l2a_quicklook/r10m" - CHUNK=4096 - echo "🗺️ S2 L2A mode: groups=$ZARR_GROUPS, chunk=$CHUNK" + echo "ℹ️ Skipping cleanup (script not available)" fi - # Build conversion command + # Get collection-specific conversion parameters from registry + echo "📋 Getting conversion parameters for $COLLECTION..." + eval $(python3 /app/scripts/get_conversion_params.py --collection "$COLLECTION") + + echo "📡 Conversion mode:" + echo " Groups: $ZARR_GROUPS" + echo " Chunk: $CHUNK" + echo " Tile width: $TILE_WIDTH" + echo " Extra flags: $EXTRA_FLAGS" + + # Build conversion command with Dask for parallel processing eopf-geozarr convert "$ZARR_URL" "$OUTPUT_PATH" \ --groups "$ZARR_GROUPS" \ $EXTRA_FLAGS \ --spatial-chunk $CHUNK \ - --tile-width 512 \ + --tile-width $TILE_WIDTH \ + --dask-cluster \ --verbose env: - name: PYTHONUNBUFFERED @@ -110,11 +115,46 @@ spec: memory: "16Gi" cpu: "4" + - name: validate + activeDeadlineSeconds: 300 # 5 min timeout + container: + image: ghcr.io/eopf-explorer/data-pipeline:v24 + imagePullPolicy: Always + command: [python] + args: + - /app/scripts/validate_geozarr.py + - "s3://esa-zarr-sentinel-explorer-fra/tests-output/{{workflow.parameters.register_collection}}/{{workflow.parameters.item_id}}.zarr" + - --item-id + - "{{workflow.parameters.item_id}}" + - --verbose + env: + - name: PYTHONUNBUFFERED + value: "1" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: geozarr-s3-credentials + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: geozarr-s3-credentials + key: AWS_SECRET_ACCESS_KEY + - name: AWS_ENDPOINT_URL + value: "https://s3.de.cloud.ovh.net" + - name: ZARR_V3_EXPERIMENTAL_API + value: "1" + resources: + requests: + memory: "2Gi" + limits: + memory: "4Gi" + - name: register-stac activeDeadlineSeconds: 300 # 5 min timeout container: # Use data-pipeline image for Python scripts (register, augment) - image: ghcr.io/eopf-explorer/data-pipeline:v21 + image: ghcr.io/eopf-explorer/data-pipeline:v23 imagePullPolicy: Always command: [python] args: @@ -137,11 +177,57 @@ spec: - name: PYTHONUNBUFFERED value: "1" + - name: benchmark + activeDeadlineSeconds: 600 # 10 min timeout + container: + image: ghcr.io/eopf-explorer/data-pipeline:v23 + imagePullPolicy: Always + command: [python] + args: + - /app/scripts/benchmark_comparison.py + - --geozarr-url + - "s3://esa-zarr-sentinel-explorer-fra/tests-output/{{workflow.parameters.register_collection}}/{{workflow.parameters.item_id}}.zarr" + - --eopf-url + - "{{workflow.parameters.source_url}}" + # - --groups # Removed: zarr_groups not available as workflow parameter + # - "{{workflow.parameters.zarr_groups}}" + - --item-id + - "{{workflow.parameters.item_id}}" + - --s3-bucket + - "esa-zarr-sentinel-explorer-fra" + - --windows + - "5" + - --verbose + env: + - name: PYTHONUNBUFFERED + value: "1" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: geozarr-s3-credentials + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: geozarr-s3-credentials + key: AWS_SECRET_ACCESS_KEY + - name: AWS_ENDPOINT_URL + value: "https://s3.de.io.cloud.ovh.net" + - name: ZARR_V3_EXPERIMENTAL_API + value: "1" + resources: + requests: + memory: "4Gi" + cpu: "1" + limits: + memory: "8Gi" + cpu: "2" + - name: augment-stac activeDeadlineSeconds: 300 # 5 min timeout container: # Use data-pipeline image for Python scripts (register, augment) - image: ghcr.io/eopf-explorer/data-pipeline:v21 + image: ghcr.io/eopf-explorer/data-pipeline:v23 imagePullPolicy: Always command: [python] args: