diff --git a/scripts/benchmark_comparison.py b/scripts/benchmark_comparison.py new file mode 100755 index 0000000..6890bce --- /dev/null +++ b/scripts/benchmark_comparison.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +"""Benchmark GeoZarr vs EOPF Zarr random access performance. + +Compares real-world access patterns: +- EOPF: xr.open_datatree with eopf-zarr engine (hierarchical) +- GeoZarr: xr.open_dataset with zarr v3 engine (individual bands) + +Both use the same access pattern: load RGB composite (b04, b03, b02) windows. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import random +import time +from datetime import UTC + +import fsspec +import numpy as np +import xarray as xr + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s") +logger = logging.getLogger(__name__) + + +def open_eopf_datatree(url: str) -> xr.Dataset: + """Open EOPF zarr using datatree (production access method). + + Returns a Dataset with multiple bands (b02, b03, b04, etc.) + """ + logger.info(f"Opening EOPF datatree: {url}") + dt = xr.open_datatree(url, engine="zarr", consolidated=False) + + # Navigate to the measurement group + for node in dt.subtree: + if node.ds and len(node.ds.data_vars) > 0: + logger.info(f"Found node: {node.path}, variables: {list(node.ds.data_vars.keys())}") + return node.ds + + raise ValueError(f"No data variables found in {url}") + + +def open_geozarr_bands(base_url: str, bands: list[str]) -> xr.Dataset: + """Open GeoZarr individual band arrays and combine into Dataset. + + Args: + base_url: Base S3 URL to the measurement group (e.g., .../r10m) + bands: List of band names to load (e.g., ['b02', 'b03', 'b04']) + + Returns: + Dataset with requested bands as data variables + """ + logger.info(f"Opening GeoZarr bands: {bands}") + + # Setup S3 filesystem + endpoint = os.getenv("AWS_ENDPOINT_URL", "https://s3.de.cloud.ovh.net") + fs = fsspec.filesystem( + "s3", + key=os.getenv("AWS_ACCESS_KEY_ID"), + secret=os.getenv("AWS_SECRET_ACCESS_KEY"), + endpoint_url=endpoint, + ) + + data_vars = {} + for band in bands: + band_url = f"{base_url}/{band}" + logger.info(f" Loading band: {band} from {band_url}") + store = fs.get_mapper(band_url) + + # Open as xarray DataArray directly (zarr v3 array) + da = xr.open_dataarray(store, engine="zarr", consolidated=False) + data_vars[band] = da + + # Combine into single Dataset + combined = xr.Dataset(data_vars) + logger.info(f"Combined GeoZarr dataset: {list(combined.data_vars.keys())}") + return combined + + +def open_zarr(url: str, is_geozarr: bool = False) -> xr.Dataset: + """Open zarr dataset using appropriate method based on format. + + Args: + url: URL to zarr store + is_geozarr: If True, treats as GeoZarr (individual bands), else EOPF + + Returns: + xarray Dataset with data variables + """ + if is_geozarr: + # GeoZarr: individual band arrays at base_url/b02, base_url/b03, etc. + # Extract base URL (remove band name if present) + base_url = url.rsplit("/", 1)[0] if url.endswith(("/b02", "/b03", "/b04", "/b08")) else url + + # Load RGB bands for typical tile request + return open_geozarr_bands(base_url, ["b02", "b03", "b04"]) + else: + # EOPF: hierarchical datatree with eopf-zarr engine + return open_eopf_datatree(url) + + +def benchmark( + url: str, is_geozarr: bool = False, num_windows: int = 5, window_size: int = 512 +) -> dict: + """Benchmark random window access on zarr dataset. + + Simulates typical map tile requests by reading RGB composite windows. + For GeoZarr, loads 3 bands (b02, b03, b04). For EOPF, uses same bands from dataset. + """ + ds = open_zarr(url, is_geozarr=is_geozarr) + + # Get dimensions and bands + bands = ["b02", "b03", "b04"] + available_bands = [b for b in bands if b in ds.data_vars] + + if not available_bands: + # Fallback: use first 3 variables + available_bands = list(ds.data_vars.keys())[:3] + logger.warning(f"RGB bands not found, using: {available_bands}") + + logger.info(f"Benchmarking bands: {available_bands}") + + # Get spatial dimensions from first band + first_var = ds[available_bands[0]] + # Find y and x dimensions (usually last two dims) + dims = list(first_var.dims) + y_dim, x_dim = dims[-2], dims[-1] + y_size, x_size = first_var.sizes[y_dim], first_var.sizes[x_dim] + + logger.info(f"Array dimensions: {y_size}×{x_size} ({y_dim}, {x_dim})") + + if y_size < window_size or x_size < window_size: + raise ValueError(f"Array too small: {y_size}×{x_size} < {window_size}") + + times = [] + for i in range(num_windows): + y = random.randint(0, y_size - window_size) + x = random.randint(0, x_size - window_size) + + start = time.perf_counter() + + # Read all RGB bands for this window (typical tile request) + for band in available_bands: + data = ds[band] + window = data.isel({y_dim: slice(y, y + window_size), x_dim: slice(x, x + window_size)}) + _ = window.compute() # Force evaluation + + elapsed = time.perf_counter() - start + times.append(elapsed) + logger.info(f" Window {i+1}: {elapsed:.3f}s ({len(available_bands)} bands)") + + return { + "avg": float(np.mean(times)), + "std": float(np.std(times)), + "min": float(np.min(times)), + "max": float(np.max(times)), + "times": [float(t) for t in times], + "bands_per_window": len(available_bands), + } + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Benchmark GeoZarr vs EOPF zarr access using real-world patterns" + ) + parser.add_argument("--geozarr-url", required=True, help="GeoZarr measurement group URL") + parser.add_argument("--eopf-url", required=True, help="EOPF measurement group URL") + parser.add_argument("--item-id", required=True) + parser.add_argument("--windows", type=int, default=5) + parser.add_argument("--window-size", type=int, default=512) + args = parser.parse_args() + + logger.info("=== GeoZarr (individual band arrays) ===") + geo = benchmark( + args.geozarr_url, is_geozarr=True, num_windows=args.windows, window_size=args.window_size + ) + + logger.info("=== EOPF (datatree with eopf-zarr engine) ===") + eopf = benchmark( + args.eopf_url, is_geozarr=False, num_windows=args.windows, window_size=args.window_size + ) + + speedup = eopf["avg"] / geo["avg"] + from datetime import datetime + + results = { + "timestamp": datetime.now(UTC).isoformat(), + "item_id": args.item_id, + "config": { + "windows": args.windows, + "window_size": args.window_size, + "access_pattern": "RGB composite (3 bands)", + }, + "geozarr": {"url": args.geozarr_url, **geo}, + "eopf": {"url": args.eopf_url, **eopf}, + "speedup": round(speedup, 2), + "geozarr_faster": speedup > 1.0, + } + logger.info(f"\n{'='*60}") + logger.info( + f"GeoZarr: {geo['avg']:.3f}s ± {geo['std']:.3f}s ({geo['bands_per_window']} bands/window)" + ) + logger.info( + f"EOPF: {eopf['avg']:.3f}s ± {eopf['std']:.3f}s ({eopf['bands_per_window']} bands/window)" + ) + logger.info(f"Speedup: {speedup:.2f}× ({'GeoZarr' if speedup > 1 else 'EOPF'} faster)") + logger.info(f"{'='*60}\n") + print(json.dumps(results, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/scripts/benchmark_tile_performance.py b/scripts/benchmark_tile_performance.py new file mode 100644 index 0000000..8171d88 --- /dev/null +++ b/scripts/benchmark_tile_performance.py @@ -0,0 +1,384 @@ +#!/usr/bin/env python3 +"""Benchmark tile generation performance for GeoZarr datasets. + +This script measures end-to-end tile generation latency via the titiler-eopf +raster API. It demonstrates the actual user-facing performance improvements +of GeoZarr over direct EOPF access. + +Usage: + python benchmark_tile_performance.py \\ + --stac-api https://api.explorer.eopf.copernicus.eu/stac \\ + --raster-api https://api.explorer.eopf.copernicus.eu/raster \\ + --collection sentinel-2-l2a \\ + --item-id S2A_MSIL2A_... \\ + --num-tiles 20 \\ + --zoom-levels 10,11,12 +""" + +import argparse +import json +import logging +import random +import sys +import time +from typing import Any +from urllib.parse import urlencode + +import requests # type: ignore[import-untyped] + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def fetch_item(stac_api: str, collection: str, item_id: str) -> dict[str, Any]: + """Fetch STAC item from API.""" + url = f"{stac_api}/collections/{collection}/items/{item_id}" + logger.info(f"Fetching STAC item: {url}") + resp = requests.get(url, timeout=30) + resp.raise_for_status() + return resp.json() # type: ignore[no-any-return] + + +def get_tile_url(raster_api: str, collection: str, item_id: str, z: int, x: int, y: int) -> str: + """Construct tile URL for given z/x/y coordinates.""" + base = f"{raster_api}/collections/{collection}/items/{item_id}" + return f"{base}/tiles/WebMercatorQuad/{z}/{x}/{y}.png" + + +def generate_tile_coordinates(zoom: int, num_tiles: int) -> list[tuple[int, int, int]]: + """Generate random tile coordinates for a given zoom level. + + Args: + zoom: Zoom level (0-20) + num_tiles: Number of random tiles to generate + + Returns: + List of (z, x, y) tuples + """ + max_coord = 2**zoom + coords = [] + for _ in range(num_tiles): + x = random.randint(0, max_coord - 1) + y = random.randint(0, max_coord - 1) + coords.append((zoom, x, y)) + return coords + + +def benchmark_tile( + raster_api: str, + collection: str, + item_id: str, + z: int, + x: int, + y: int, + params: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Fetch a single tile and measure latency. + + Args: + raster_api: Base raster API URL + collection: Collection ID + item_id: Item ID + z, x, y: Tile coordinates + params: Optional query parameters (e.g., assets, rescale) + + Returns: + Dictionary with timing metrics and response info + """ + url = get_tile_url(raster_api, collection, item_id, z, x, y) + if params: + url = f"{url}?{urlencode(params)}" + + start = time.perf_counter() + try: + resp = requests.get(url, timeout=60) + elapsed = time.perf_counter() - start + + success = resp.status_code == 200 + size_bytes = len(resp.content) if success else 0 + + return { + "z": z, + "x": x, + "y": y, + "url": url, + "success": success, + "status_code": resp.status_code, + "latency_ms": elapsed * 1000, + "size_bytes": size_bytes, + "error": None if success else resp.text[:200], + } + except Exception as e: + elapsed = time.perf_counter() - start + return { + "z": z, + "x": x, + "y": y, + "url": url, + "success": False, + "status_code": None, + "latency_ms": elapsed * 1000, + "size_bytes": 0, + "error": str(e)[:200], + } + + +def benchmark_zoom_level( + raster_api: str, + collection: str, + item_id: str, + zoom: int, + num_tiles: int, + params: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Benchmark multiple tiles at a specific zoom level. + + Args: + raster_api: Base raster API URL + collection: Collection ID + item_id: Item ID + zoom: Zoom level + num_tiles: Number of tiles to test + params: Optional query parameters + + Returns: + Aggregated statistics for this zoom level + """ + logger.info(f"Benchmarking zoom level {zoom} ({num_tiles} tiles)") + coords = generate_tile_coordinates(zoom, num_tiles) + + results = [] + for z, x, y in coords: + result = benchmark_tile(raster_api, collection, item_id, z, x, y, params) + results.append(result) + status = "✓" if result["success"] else "✗" + logger.debug( + f" {status} z{z}/{x}/{y}: {result['latency_ms']:.1f}ms " + f"({result['size_bytes']/1024:.1f}KB)" + ) + + # Calculate statistics + successful = [r for r in results if r["success"]] + if not successful: + logger.warning(f"All tiles failed at zoom {zoom}") + return { + "zoom": zoom, + "num_tiles": num_tiles, + "num_successful": 0, + "success_rate": 0.0, + "latency_ms": None, + "results": results, + } + + latencies = [r["latency_ms"] for r in successful] + sizes = [r["size_bytes"] for r in successful] + + stats = { + "zoom": zoom, + "num_tiles": num_tiles, + "num_successful": len(successful), + "success_rate": len(successful) / num_tiles, + "latency_ms": { + "mean": sum(latencies) / len(latencies), + "min": min(latencies), + "max": max(latencies), + "p50": sorted(latencies)[len(latencies) // 2], + "p95": sorted(latencies)[int(len(latencies) * 0.95)], + }, + "size_bytes": { + "mean": sum(sizes) / len(sizes), + "min": min(sizes), + "max": max(sizes), + }, + "results": results, + } + + logger.info( + f" Zoom {zoom}: {stats['latency_ms']['mean']:.1f}ms avg, " # type: ignore[index] + f"{stats['latency_ms']['p95']:.1f}ms p95, " + f"{stats['success_rate']:.1%} success" + ) + + return stats + + +def main() -> None: + parser = argparse.ArgumentParser(description="Benchmark tile generation performance") + parser.add_argument( + "--stac-api", + required=True, + help="STAC API base URL", + ) + parser.add_argument( + "--raster-api", + required=True, + help="Raster API base URL (titiler-eopf)", + ) + parser.add_argument( + "--collection", + required=True, + help="Collection ID", + ) + parser.add_argument( + "--item-id", + required=True, + help="Item ID to benchmark", + ) + parser.add_argument( + "--num-tiles", + type=int, + default=20, + help="Number of tiles to test per zoom level (default: 20)", + ) + parser.add_argument( + "--zoom-levels", + default="10,11,12", + help="Comma-separated zoom levels to test (default: 10,11,12)", + ) + parser.add_argument( + "--assets", + help="Comma-separated asset keys to visualize (e.g., b04,b03,b02)", + ) + parser.add_argument( + "--rescale", + help="Rescale values (e.g., 0,3000)", + ) + parser.add_argument( + "--output", + help="Output JSON file for results", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + if args.verbose: + logger.setLevel(logging.DEBUG) + + # Parse zoom levels + try: + zoom_levels = [int(z.strip()) for z in args.zoom_levels.split(",")] + except ValueError: + logger.error(f"Invalid zoom levels: {args.zoom_levels}") + sys.exit(1) + + # Fetch item metadata + try: + item = fetch_item(args.stac_api, args.collection, args.item_id) + logger.info(f"Item found: {item['id']} in {item['collection']}") + except Exception as e: + logger.error(f"Failed to fetch item: {e}") + sys.exit(1) + + # Build query parameters + params: dict[str, Any] = {} + if args.assets: + params["assets"] = args.assets + elif args.collection.startswith("sentinel-2"): + # Default to RGB composite for S2 + params["assets"] = "SR_10m" + params["asset_as_band"] = "true" + params["bidx"] = "4,3,2" # R,G,B bands from SR_10m + logger.info("Using default S2 RGB assets: SR_10m (bands 4,3,2)") + elif args.collection.startswith("sentinel-1"): + # Default to VV/VH for S1 + params["assets"] = "vv,vh" + logger.info("Using default S1 assets: vv,vh") + + if args.rescale: + params["rescale"] = args.rescale + elif "sentinel-2" in args.collection: + # Default rescale for S2 + params["rescale"] = "0,3000" + logger.info("Using default S2 rescale: 0,3000") + + logger.info(f"Query parameters: {params}") + + # Benchmark each zoom level + all_results = [] + total_start = time.perf_counter() + + for zoom in zoom_levels: + stats = benchmark_zoom_level( + args.raster_api, + args.collection, + args.item_id, + zoom, + args.num_tiles, + params, + ) + all_results.append(stats) + + total_elapsed = time.perf_counter() - total_start + + # Calculate overall statistics + all_successful = [r for stats in all_results for r in stats["results"] if r["success"]] + all_latencies = [r["latency_ms"] for r in all_successful] + + summary = { + "item_id": args.item_id, + "collection": args.collection, + "raster_api": args.raster_api, + "zoom_levels": zoom_levels, + "num_tiles_per_zoom": args.num_tiles, + "total_tiles": len(zoom_levels) * args.num_tiles, + "total_successful": len(all_successful), + "overall_success_rate": len(all_successful) / (len(zoom_levels) * args.num_tiles), + "total_duration_sec": total_elapsed, + "overall_latency_ms": { + "mean": sum(all_latencies) / len(all_latencies) if all_latencies else None, + "min": min(all_latencies) if all_latencies else None, + "max": max(all_latencies) if all_latencies else None, + "p50": sorted(all_latencies)[len(all_latencies) // 2] if all_latencies else None, + "p95": sorted(all_latencies)[int(len(all_latencies) * 0.95)] if all_latencies else None, + }, + "zoom_level_results": all_results, + } + + # Print summary + print("\n" + "=" * 70) + print("TILE PERFORMANCE BENCHMARK SUMMARY") + print("=" * 70) + print(f"Item: {summary['item_id']}") + print(f"Collection: {summary['collection']}") + print(f"Zoom levels: {', '.join(map(str, zoom_levels))}") + print(f"Tiles per zoom: {args.num_tiles}") + print(f"Total tiles: {summary['total_tiles']}") + print( + f"Successful: {summary['total_successful']} ({summary['overall_success_rate']:.1%})" + ) + print(f"Total duration: {summary['total_duration_sec']:.2f}s") + print() + if all_latencies: + print("Overall Latency:") + print(f" Mean: {summary['overall_latency_ms']['mean']:.1f}ms") + print(f" Median (p50): {summary['overall_latency_ms']['p50']:.1f}ms") + print(f" 95th percentile: {summary['overall_latency_ms']['p95']:.1f}ms") + print(f" Min: {summary['overall_latency_ms']['min']:.1f}ms") + print(f" Max: {summary['overall_latency_ms']['max']:.1f}ms") + print() + print("Per-Zoom Results:") + for stats in all_results: + if stats["latency_ms"]: + print( + f" z{stats['zoom']:2d}: " + f"{stats['latency_ms']['mean']:6.1f}ms avg, " + f"{stats['latency_ms']['p95']:6.1f}ms p95, " + f"{stats['success_rate']:5.1%} success" + ) + else: + print(f" z{stats['zoom']:2d}: All tiles failed") + print("=" * 70) + + # Save to file if requested + if args.output: + with open(args.output, "w") as f: + json.dump(summary, f, indent=2) + logger.info(f"Results saved to {args.output}") + + +if __name__ == "__main__": + main() diff --git a/scripts/validate_geozarr.py b/scripts/validate_geozarr.py new file mode 100755 index 0000000..5fa6585 --- /dev/null +++ b/scripts/validate_geozarr.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""Validate GeoZarr compliance and generate quality metrics.""" + +from __future__ import annotations + +import argparse +import json +import logging +import subprocess +import sys +from datetime import UTC, datetime +from pathlib import Path + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s") +logger = logging.getLogger(__name__) + + +def validate_geozarr(dataset_path: str, verbose: bool = False) -> dict: + """Run eopf-geozarr validate and parse results. + + Returns: + dict with validation status and any errors/warnings + """ + logger.info(f"Validating: {dataset_path}") + + cmd = ["eopf-geozarr", "validate", dataset_path] + if verbose: + cmd.append("--verbose") + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300, # 5 minute timeout + ) + + validation_result = { + "valid": result.returncode == 0, + "exit_code": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr, + } + + if result.returncode == 0: + logger.info("✅ Validation passed") + else: + logger.error(f"❌ Validation failed (exit code {result.returncode})") + if result.stderr: + logger.error(f"Errors:\n{result.stderr}") + + return validation_result + + except subprocess.TimeoutExpired: + logger.error("❌ Validation timeout (>5 minutes)") + return { + "valid": False, + "exit_code": -1, + "error": "Validation timeout", + } + except Exception as e: + logger.error(f"❌ Validation error: {e}") + return { + "valid": False, + "exit_code": -1, + "error": str(e), + } + + +def main() -> None: + parser = argparse.ArgumentParser(description="Validate GeoZarr compliance") + parser.add_argument("dataset_path", help="Path to GeoZarr dataset (S3 or local)") + parser.add_argument("--item-id", help="STAC item ID for tracking") + parser.add_argument("--output", help="Output JSON file path") + parser.add_argument("--verbose", action="store_true", help="Verbose validation output") + args = parser.parse_args() + + # Run validation + validation = validate_geozarr(args.dataset_path, args.verbose) + + # Build complete result + result = { + "timestamp": datetime.now(UTC).isoformat(), + "dataset_path": args.dataset_path, + "item_id": args.item_id, + "validation": validation, + } + + # Write to file if requested + if args.output: + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + json.dump(result, f, indent=2) + logger.info(f"Results written to: {output_path}") + + # Print summary + logger.info("\n" + "=" * 60) + logger.info(f"Dataset: {args.dataset_path}") + logger.info(f"Valid: {validation['valid']}") + if args.item_id: + logger.info(f"Item ID: {args.item_id}") + logger.info("=" * 60 + "\n") + + # Output JSON for workflow + print(json.dumps(result, indent=2)) + + # Exit with validation status + sys.exit(0 if validation["valid"] else 1) + + +if __name__ == "__main__": + main() diff --git a/workflows/run-benchmark-test.yaml b/workflows/run-benchmark-test.yaml new file mode 100644 index 0000000..a35042c --- /dev/null +++ b/workflows/run-benchmark-test.yaml @@ -0,0 +1,51 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: benchmark-test- + namespace: devseed-staging +spec: + entrypoint: benchmark + arguments: + parameters: + - name: geozarr_url + value: "s3://esa-zarr-sentinel-explorer-fra/tests-output/sentinel-2-l2a-dp-test/e2e-test-recent-1759867528.zarr/measurements/reflectance/r10m" + - name: eopf_url + value: "https://objects.eodc.eu:443/e05ab01a9d56408d82ac32d69a5aae2a:202510-s02msil2a-eu/07/products/cpm_v256/S2C_MSIL2A_20251007T143111_N0511_R139_T26WME_20251007T154617.zarr/measurements/reflectance/r10m" + - name: item_id + value: "e2e-test-recent-1759867528-rgb" + templates: + - name: benchmark + container: + image: ghcr.io/eopf-explorer/data-pipeline:v25-dataarray + command: ["python3"] + args: + - /app/scripts/benchmark_comparison.py + - --geozarr-url={{workflow.parameters.geozarr_url}} + - --eopf-url={{workflow.parameters.eopf_url}} + - --item-id={{workflow.parameters.item_id}} + - --windows=5 + - --window-size=512 + env: + - name: PYTHONUNBUFFERED + value: "1" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: geozarr-s3-credentials + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: geozarr-s3-credentials + key: AWS_SECRET_ACCESS_KEY + - name: AWS_ENDPOINT_URL + value: "https://s3.de.cloud.ovh.net" + - name: ZARR_V3_EXPERIMENTAL_API + value: "1" + resources: + requests: + memory: 4Gi + limits: + memory: 8Gi + activeDeadlineSeconds: 600 + serviceAccountName: operate-workflow-sa