diff --git a/README.md b/README.md index 44f3f6e..29356da 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,44 @@ eopf-geozarr validate output.zarr # Get help eopf-geozarr --help + +### Pipeline-Oriented CLI + +The package also ships a workflow-friendly entrypoint that mirrors the RabbitMQ Sensor payload used in +`data-model-pipeline`. Use it for dry runs or smoke tests that mimic the production event flow while +keeping the canonical payload contract in a single place. Refer to +[`docs/pipeline-integration.md`](docs/pipeline-integration.md) for the full cross-repository overview. + +```bash +# Inspect available arguments (mirrors Sensor → WorkflowTemplate parameters) +eopf-geozarr-pipeline run --help + +# Replay a payload.json (for example, from data-model-pipeline/workflows/payload.json) +eopf-geozarr-pipeline run \ + --src-item "https://stac.core.eopf.eodc.eu/..." \ + --output-zarr "s3://bucket/path/out.zarr" \ + --groups measurements/reflectance/r10m,measurements/reflectance/r20m \ + --register-collection sentinel-2-l2a \ + --register-url https://api.explorer.eopf.copernicus.eu/stac \ + --overwrite replace \ + --metrics-out s3://bucket/metrics/latest.json + +# Validate a produced GeoZarr store +python - <<'PY' +from eopf_geozarr.pipeline import validate_geozarr_store + +report = validate_geozarr_store("s3://bucket/path/out.zarr") +print(report.summary()) +for line in report.detailed(): + print(" -", line) +PY + + ### Observability and Metrics + + Both CLIs now support `--metrics-out`, allowing workflows to persist per-run diagnostics either to the + local filesystem or directly to `s3://` destinations. The payload helpers expose the same field so Argo + Workflow templates and RabbitMQ messages stay aligned. +``` ``` ### S3 Support diff --git a/docs/pipeline-integration.md b/docs/pipeline-integration.md new file mode 100644 index 0000000..b02808e --- /dev/null +++ b/docs/pipeline-integration.md @@ -0,0 +1,89 @@ +# Pipeline Integration Overview + +The `eopf-geozarr` package provides the conversion logic that powers the GeoZarr workflow in +`data-model-pipeline`. This guide highlights the contract between both repositories so that developer +workflows, Argo templates, and RabbitMQ messages remain synchronized. + +## Shared Responsibilities + +| Concern | Provided by `data-model` | Provided by `data-model-pipeline` | +| --- | --- | --- | +| GeoZarr conversion & validation | `eopf_geozarr` conversion engine, CLI, pipeline runner, payload schema | Invokes library within Argo Workflows, publishes AMQP payloads | +| Payload contract | `GeoZarrPayload` dataclass, JSON schema, bundled fixtures | Sensors and tests consume the shared helpers | +| Observability | `--metrics-out` flag routes metrics to local or S3 destinations | Workflows collect and forward metrics to long-term storage | +| STAC registration helpers | `validate_geozarr_store`, pipeline runner hooks | Orchestrates STAC Transactions based on payload flags | + +## Command-Line Surfaces + +### `eopf-geozarr` + +The original CLI remains the most direct way to convert EOPF datasets. It now accepts +`--metrics-out` so you can persist run summaries alongside converted assets. Metrics targets support +both local paths and S3 URIs. + +### `eopf-geozarr-pipeline` + +The pipeline-specific entrypoint mirrors the RabbitMQ payload processed by production Argo sensors. +It is ideal for replaying payloads locally or verifying template changes: + +```bash +# Validate payload flags before triggering the workflow +$ eopf-geozarr-pipeline run --help + +# Replay a bundled example payload +$ eopf-geozarr-pipeline run --payload-file <(python - <<'PY' +from eopf_geozarr.pipeline import load_example_payload +import json +print(json.dumps(load_example_payload("minimal"))) +PY +) +``` + +Both CLIs normalize group names, default to the Sentinel-2 reflectance groups, and respect the shared +payload schema described below. + +## Python Helpers + +The `eopf_geozarr.pipeline` package exposes helpers that keep repositories aligned: + +```python +from eopf_geozarr.pipeline import ( + GeoZarrPayload, + PAYLOAD_JSON_SCHEMA, + get_payload_schema, + load_example_payload, + run_pipeline, + validate_payload, +) + +payload = GeoZarrPayload.from_payload(load_example_payload("full")) +payload.ensure_required() +validate_payload(payload.to_payload()) +print(PAYLOAD_JSON_SCHEMA["required"]) # ["src_item", "output_zarr"] +``` + +- `GeoZarrPayload` parses CLI arguments or RabbitMQ payloads and produces normalized values. +- `PAYLOAD_JSON_SCHEMA` and `get_payload_schema()` deliver a canonical JSON schema for validation in + tests or runtime checks. +- `load_example_payload()` exposes fixtures that mirror the messages published by the AMQP tooling. +- `validate_payload()` wraps `jsonschema` with the library-managed schema, ensuring the same rules + apply everywhere. + +## Bundled Fixtures + +Two JSON fixtures live under `eopf_geozarr/pipeline/resources`: + +- `payload-minimal.json` represents the baseline message with only required fields. +- `payload-full.json` exercises optional knobs such as STAC registration and metrics targets. + +Use these fixtures to seed integration tests in `data-model-pipeline` or to document payload +expectations in other repositories. They are also accessible at runtime via `load_example_payload()`. + +## Next Steps + +- `data-model-pipeline` should import the schema helpers when validating AMQP payloads and update its + tests to rely on the shared fixtures. +- `platform-deploy` can reference the same schema when templating new WorkflowTemplates or Flux + overlays, ensuring environment values stay in sync. +- Future payload changes should originate in this repository so all downstream consumers inherit the + update automatically. diff --git a/mkdocs.yml b/mkdocs.yml index 36d9f4a..4f4f9d6 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -63,6 +63,7 @@ nav: - Installation: installation.md - Quick Start: quickstart.md - User Guide: converter.md + - Pipeline Integration: pipeline-integration.md - API Reference: api-reference.md - Examples: examples.md - Architecture: architecture.md diff --git a/pyproject.toml b/pyproject.toml index 56c483e..fbf65ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,7 +126,21 @@ warn_unreachable = true strict_equality = true [[tool.mypy.overrides]] -module = ["zarr.*", "xarray.*", "rioxarray.*", "cf_xarray.*", "dask.*"] +module = [ + "zarr.*", + "xarray.*", + "rioxarray.*", + "cf_xarray.*", + "dask.*", + "jsonschema", + "jsonschema.*", + "s3fs", + "s3fs.*", + "fsspec", + "fsspec.*", + "rasterio", + "rasterio.*", +] ignore_missing_imports = true [tool.pytest.ini_options] @@ -141,10 +155,7 @@ markers = [ [tool.coverage.run] source = ["src"] -omit = [ - "tests/*", - "setup.py", -] +omit = ["tests/*", "setup.py"] [tool.coverage.report] exclude_lines = [ diff --git a/src/eopf_geozarr/__init__.py b/src/eopf_geozarr/__init__.py index bb366e0..880943f 100644 --- a/src/eopf_geozarr/__init__.py +++ b/src/eopf_geozarr/__init__.py @@ -13,18 +13,20 @@ setup_datatree_metadata_geozarr_spec_compliant, validate_existing_band_data, ) +from .validator import validate_geozarr_store __version__ = version("eopf-geozarr") __all__ = [ "__version__", - "create_geozarr_dataset", - "setup_datatree_metadata_geozarr_spec_compliant", - "iterative_copy", - "consolidate_metadata", "async_consolidate_metadata", - "downsample_2d_array", "calculate_aligned_chunk_size", + "consolidate_metadata", + "create_geozarr_dataset", + "downsample_2d_array", "is_grid_mapping_variable", + "iterative_copy", + "setup_datatree_metadata_geozarr_spec_compliant", "validate_existing_band_data", + "validate_geozarr_store", ] diff --git a/src/eopf_geozarr/cli.py b/src/eopf_geozarr/cli.py index 78ff3cc..f2beef3 100644 --- a/src/eopf_geozarr/cli.py +++ b/src/eopf_geozarr/cli.py @@ -6,9 +6,13 @@ """ import argparse +import json import sys +import time +from datetime import datetime, timezone +from importlib.metadata import PackageNotFoundError, version from pathlib import Path -from typing import Any, Optional +from typing import Any import xarray as xr @@ -21,7 +25,34 @@ ) -def setup_dask_cluster(enable_dask: bool, verbose: bool = False) -> Optional[Any]: +def _package_version() -> str: + try: + return version("eopf-geozarr") + except PackageNotFoundError: # pragma: no cover - fallback for editable installs + return "unknown" + + +def _write_metrics(path: str, payload: dict[str, Any]) -> None: + target = path.strip() + if not target: + return + content = json.dumps(payload, indent=2) + "\n" + if is_s3_path(target): + try: + import fsspec + except ImportError as exc: # pragma: no cover - dependency guard + raise RuntimeError( + "Writing metrics to S3 requires 'fsspec'; install with the s3 extras" + ) from exc + with fsspec.open(target, "w", encoding="utf-8") as stream: + stream.write(content) + return + metrics_path = Path(target) + metrics_path.parent.mkdir(parents=True, exist_ok=True) + metrics_path.write_text(content, encoding="utf-8") + + +def setup_dask_cluster(enable_dask: bool, verbose: bool = False) -> Any | None: """ Set up a dask cluster for parallel processing. @@ -57,7 +88,7 @@ def setup_dask_cluster(enable_dask: bool, verbose: bool = False) -> Optional[Any except ImportError: print( - "❌ Error: dask.distributed not available. Install with: pip install 'dask[distributed]'" + "❌ Error: dask.distributed not available. Install via pip install 'dask[distributed]'" ) sys.exit(1) except Exception as e: @@ -79,6 +110,9 @@ def convert_command(args: argparse.Namespace) -> None: enable_dask=getattr(args, "dask_cluster", False), verbose=args.verbose ) + started_at = time.perf_counter() + started_ts = datetime.now(timezone.utc) + try: # Validate input path (handle both local paths and URLs) input_path_str = args.input_path @@ -171,6 +205,30 @@ def convert_command(args: argparse.Namespace) -> None: print("✅ Successfully converted EOPF dataset to GeoZarr format") print(f"Output saved to: {output_path}") + metrics_target = (getattr(args, "metrics_out", None) or "").strip() + if metrics_target: + finished_ts = datetime.now(timezone.utc) + metrics = { + "src_item": input_path, + "output_zarr": output_path, + "groups": list(args.groups or []), + "crs_groups": list(args.crs_groups or []), + "spatial_chunk": args.spatial_chunk, + "min_dimension": args.min_dimension, + "tile_width": args.tile_width, + "dask_cluster": bool(getattr(args, "dask_cluster", False)), + "max_retries": args.max_retries, + "duration_seconds": round(time.perf_counter() - started_at, 3), + "started_at": started_ts.isoformat(), + "finished_at": finished_ts.isoformat(), + "version": _package_version(), + } + try: + _write_metrics(metrics_target, metrics) + print(f"📈 Metrics written to {metrics_target}") + except Exception as exc: # pragma: no cover - diagnostics only + print(f"⚠️ Warning: failed to write metrics to {metrics_target}: {exc}") + if args.verbose: # Check if dt_geozarr is a DataTree or Dataset if hasattr(dt_geozarr, "children"): @@ -273,7 +331,7 @@ def _generate_optimized_tree_html(dt: xr.DataTree) -> str: """ def has_meaningful_content(node: Any) -> bool: - """Check if a node has meaningful content (data variables, attributes, or meaningful children).""" + """Check if a node has data, attributes, or meaningful child nodes.""" if hasattr(node, "ds") and node.ds is not None: # Has data variables if hasattr(node.ds, "data_vars") and len(node.ds.data_vars) > 0: @@ -316,7 +374,9 @@ def format_data_vars(data_vars: Any) -> str: # Fallback to simple format if xarray HTML fails vars_html = [] for name, var in data_vars.items(): - dims_str = format_dimensions(dict(zip(var.dims, var.shape))) + dims_str = format_dimensions( + dict(zip(var.dims, var.shape, strict=True)) + ) dtype_str = str(var.dtype) vars_html.append( f""" @@ -474,7 +534,10 @@ def render_node(node: Any, path: str = "", level: int = 0) -> str: .tree-node {{ margin-bottom: 8px; }} - + help=( + "Groups that need CRS information added on best-effort basis " + "(e.g., /conditions/geometry)" + ), .tree-details {{ border: 1px solid #e1e5e9; border-radius: 6px; @@ -549,7 +612,9 @@ def render_node(node: Any, path: str = "", level: int = 0) -> str: }} .var-name {{ - font-family: 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', Consolas, 'Courier New', monospace; + font-family: + 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', + Consolas, 'Courier New', monospace; font-weight: 600; color: #0969da; min-width: 120px; @@ -563,7 +628,9 @@ def render_node(node: Any, path: str = "", level: int = 0) -> str: .var-dtype {{ color: #1a7f37; - font-family: 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', Consolas, 'Courier New', monospace; + font-family: + 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', + Consolas, 'Courier New', monospace; font-size: 0.85em; font-weight: 500; background-color: #f6f8fa; @@ -586,7 +653,9 @@ def render_node(node: Any, path: str = "", level: int = 0) -> str: .attr-value {{ color: #656d76; - font-family: 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', Consolas, 'Courier New', monospace; + font-family: + 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', + Consolas, 'Courier New', monospace; font-size: 0.85em; }} @@ -638,7 +707,9 @@ def _generate_html_output(