From d52e21e18315977aec84991249a0c1c7c6f38fe8 Mon Sep 17 00:00:00 2001 From: Wietze Date: Tue, 7 Oct 2025 21:50:29 -0400 Subject: [PATCH] test: test suite and documentation Add comprehensive testing and project documentation. - Unit tests for register_stac and augment_stac_item - Integration tests for workflow submission - E2E test configuration - Project README, CONTRIBUTING, QUICKSTART guides - CI workflow (GitHub Actions) --- .github/workflows/test.yml | 53 +++++ CONTRIBUTING.md | 295 ++++++++++++++++++++++++ GETTING_STARTED.md | 164 ++++++++++++++ LICENSE | 176 ++++++++++++++ QUICKSTART_E2E.md | 178 +++++++++++++++ README.md | 55 +++-- tests/conftest.py | 62 +++++ tests/integration/__init__.py | 1 + tests/integration/test_pipeline_e2e.py | 228 +++++++++++++++++++ tests/unit/__init__.py | 1 + tests/unit/test_augment_stac_item.py | 302 +++++++++++++++++++++++++ tests/unit/test_register_stac.py | 295 ++++++++++++++++++++++++ validate-setup.sh | 129 +++++++++++ 13 files changed, 1926 insertions(+), 13 deletions(-) create mode 100644 .github/workflows/test.yml create mode 100644 CONTRIBUTING.md create mode 100644 GETTING_STARTED.md create mode 100644 LICENSE create mode 100644 QUICKSTART_E2E.md create mode 100644 tests/conftest.py create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/test_pipeline_e2e.py create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/test_augment_stac_item.py create mode 100644 tests/unit/test_register_stac.py create mode 100755 validate-setup.sh diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..a3c424e --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,53 @@ +name: Tests + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install uv + uses: astral-sh/setup-uv@v3 + with: + version: "latest" + + - name: Create Python version symlink + run: | + sudo ln -sf $(which python) /usr/bin/python${{ matrix.python-version }} + python${{ matrix.python-version }} --version + + - name: Install dependencies + run: uv sync --all-extras + + - name: Set up pre-commit cache + uses: actions/cache@v4 + with: + path: ~/.cache/pre-commit + key: pre-commit-${{ matrix.python-version }}-${{ hashFiles('.pre-commit-config.yaml') }} + + - name: Run pre-commit checks + run: uv run pre-commit run --all-files + + - name: Run tests with coverage + run: uv run pytest --cov=scripts --cov-report=xml --cov-report=term + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + files: ./coverage.xml + fail_ci_if_error: false diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..b8602a3 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,295 @@ +# Contributing + +## Setup + +```bash +git clone https://github.com/EOPF-Explorer/data-pipeline.git +cd data-pipeline +uv sync --all-extras +uv run pre-commit install +make test +``` + +**Requirements:** Python 3.11+, uv, kubectl (for integration tests) + +## Testing + +```bash +make test # All tests +pytest --cov=scripts --cov-report=html # With coverage +pytest tests/test_register_stac.py -v # Specific file +``` + +**Coverage goal:** 80%+ on core scripts (current: 25%) + +## Code Style + +Automated via pre-commit: ruff (lint), ruff-format, mypy (types), yaml-check. + +```bash +uv run pre-commit run --all-files # All checks +uv run pre-commit run ruff --all-files # Specific check +``` + +**Type hints required:** +```python +def extract_item_id(stac_item: dict[str, Any]) -> str: # ✅ + return stac_item["id"] +``` + +**Google-style docstrings:** +```python +def publish_message(config: Config, payload: dict[str, Any]) -> str: + """Publish to RabbitMQ and trigger workflow. + + Args: + config: RabbitMQ credentials + payload: Workflow payload + + Returns: + Item ID + + Raises: + RuntimeError: If publish fails or connection times out + """ +``` + +## 📝 Commit Messages + +We follow [Conventional Commits](https://www.conventionalcommits.org/): + +```bash +# Format: (): + +# Types: +feat: New feature +fix: Bug fix +docs: Documentation only +refactor: Code restructuring (no behavior change) +test: Adding/updating tests +chore: Maintenance (dependencies, configs) +perf: Performance improvement +ci: CI/CD changes + +# Examples: +git commit -m "feat(stac): add TiTiler viewer links to STAC items" +git commit -m "fix(workflow): correct S3 credential mounting" +git commit -m "docs: update README with troubleshooting section" +git commit -m "test: add integration tests for AMQP publishing" +``` + +## 🔄 Pull Request Process + +### Before Opening PR + +- [ ] All tests pass: `make test` +- [ ] Pre-commit hooks pass: `uv run pre-commit run --all-files` +- [ ] Documentation updated (README, docstrings) +- [ ] CHANGELOG.md updated with changes +- [ ] Commit messages follow conventional format + +### PR Checklist Template + +When you open a PR, include: + +```markdown +## Description +Brief description of what this PR does + +## Type of Change +- [ ] Bug fix (non-breaking change fixing an issue) +- [ ] New feature (non-breaking change adding functionality) +- [ ] Breaking change (fix or feature causing existing functionality to change) +- [ ] Documentation update + +## Testing +- [ ] Tests pass locally (`make test`) +- [ ] Pre-commit hooks pass +- [ ] Tested manually (describe steps) + +## Screenshots (if applicable) +Add screenshots for UI/visual changes +``` + +### Review Process + +1. Automated checks run (tests, linting) +2. At least one maintainer review required +3. Address feedback with new commits +4. Squash-merge after approval + +## 🏗️ Project Structure + +``` +data-pipeline/ +├── scripts/ # Core pipeline scripts +│ ├── publish_amqp.py +│ ├── register_stac.py +│ └── augment_stac_item.py +├── workflows/ # Argo Workflow templates +│ ├── geozarr-convert-template.yaml +│ └── payload.json +├── examples/ # Standalone examples and interactive tools +│ ├── simple_register.py +│ └── operator.ipynb +├── tests/ # Test suite +│ ├── test_register_stac.py +│ └── conftest.py +├── docker/ # Container definitions +└── pyproject.toml # Dependencies and config +``` + +## 🐛 Reporting Bugs + +### Before Reporting + +1. Check existing issues +2. Verify it's reproducible +3. Test with latest code + +### Bug Report Template + +```markdown +**Describe the bug** +Clear description of what's wrong + +**To Reproduce** +Steps to reproduce: +1. Run command '...' +2. See error + +**Expected behavior** +What should happen + +**Environment:** +- Python version: [e.g., 3.11.5] +- OS: [e.g., macOS 14.0] +- Kubernetes version: [e.g., 1.28] + +**Logs** +``` +Paste relevant logs here +``` +``` + +## 💡 Feature Requests + +We welcome feature ideas! Please: + +1. Check if similar request exists +2. Describe use case clearly +3. Explain expected behavior +4. Consider implementation approach + +## 📚 Documentation + +### README Updates + +When adding features, update: +- Quick Start section +- Usage examples +- Configuration options +- Troubleshooting + +### Inline Documentation + +- Add docstrings to all public functions +- Include type hints +- Explain non-obvious logic with comments +- Link to related documentation + +## 🧑‍💻 Development Workflow + +### Local Development Loop + +```bash +# 1. Create feature branch +git checkout -b feat/my-feature + +# 2. Make changes +# ... edit files ... + +# 3. Run tests +make test + +# 4. Format and lint +uv run pre-commit run --all-files + +# 5. Commit +git add . +git commit -m "feat: add my feature" + +# 6. Push and open PR +git push origin feat/my-feature +``` + +### Testing Changes + +**For script changes:** +```bash +# Unit tests +pytest tests/test_my_script.py -v + +# Integration test (requires cluster) +make test-e2e +``` + +**For workflow changes:** +```bash +# Deploy to test namespace +kubectl apply -f workflows/geozarr-convert-template.yaml -n test + +# Trigger test run +kubectl create -f workflows/test-run.yaml -n test +``` + +**For notebook changes:** +```bash +# Launch notebook +make demo + +# Test cells manually +# Verify outputs match expected results +``` + +## 🔐 Security + +### Credentials + +**Never commit:** +- API keys +- S3 credentials +- RabbitMQ passwords +- kubeconfig files + +**Use instead:** +- Kubernetes secrets +- Environment variables +- `.env` files (in `.gitignore`) + +### Reporting Vulnerabilities + +Email security issues to: security@eopf-explorer.eu + +## 📞 Getting Help + +- **Questions**: Open a [GitHub Discussion](https://github.com/EOPF-Explorer/data-pipeline/discussions) +- **Bugs**: Open an [Issue](https://github.com/EOPF-Explorer/data-pipeline/issues) +- **Chat**: Join our Slack channel (request invite) + +## 🎓 Learning Resources + +### Recommended Reading + +- [STAC Specification](https://stacspec.org/) +- [GeoZarr Spec](https://github.com/zarr-developers/geozarr-spec) +- [Argo Workflows Docs](https://argo-workflows.readthedocs.io/) +- [TiTiler Documentation](https://developmentseed.org/titiler/) + +### Example Workflows + +See `examples/operator.ipynb` for complete workflow example. + +## 🙏 Thank You! + +Your contributions make this project better for everyone. We appreciate your time and effort! 🚀 diff --git a/GETTING_STARTED.md b/GETTING_STARTED.md new file mode 100644 index 0000000..5f9afa6 --- /dev/null +++ b/GETTING_STARTED.md @@ -0,0 +1,164 @@ +# Getting Started + +Setup guide for running GeoZarr conversions (15 minutes). + +## Overview + +Converts Sentinel-2 Zarr to cloud-optimized GeoZarr with web visualization. + +**Input:** STAC item URL +**Output:** Interactive map at `https://api.explorer.eopf.copernicus.eu/raster/viewer?url=...` + +## Prerequisites + +**Required:** +- OVH Kubernetes cluster access (managed by platform-deploy) +- Python 3.11+ and kubectl on local machine + +**Not required:** +- Docker, deep Kubernetes knowledge, Argo Workflows expertise + +## Step 1: Configure kubectl + +Download kubeconfig from [OVH Manager](https://www.ovh.com/manager/#/public-cloud/pci/projects/bcc5927763514f499be7dff5af781d57/kubernetes/f5f25708-bd15-45b9-864e-602a769a5fcf/service) (Access and security → kubeconfig). + +```bash +mkdir -p .work +mv ~/Downloads/kubeconfig-*.yml .work/kubeconfig +export KUBECONFIG=$(pwd)/.work/kubeconfig +echo "export KUBECONFIG=$(pwd)/.work/kubeconfig" >> ~/.zshrc + +kubectl get nodes # Should list 3-5 nodes +``` + +## Step 2: Install Dependencies + +```bash +# Using uv (recommended) +curl -LsSf https://astral.sh/uv/install.sh | sh +uv sync --all-extras + +# Or using pip +pip install pika click requests +``` + +## Step 3: Deploy Infrastructure + +```bash +kubectl apply -f workflows/rbac.yaml -n devseed +kubectl apply -f workflows/eventsource.yaml -n devseed +kubectl apply -f workflows/sensor.yaml -n devseed +kubectl apply -f workflows/template.yaml -n devseed + +# Verify +./validate-setup.sh +``` + +Deploys: RBAC permissions, RabbitMQ event source, workflow trigger sensor, conversion template. + +## Step 4: Submit Job + +```bash +# Port-forward RabbitMQ and submit in one command +kubectl port-forward -n core svc/rabbitmq 5672:5672 & +sleep 2 +export AMQP_URL="amqp://user:$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d)@localhost:5672/" +uv run python examples/submit.py \ + --stac-url "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2B_MSIL2A_20250518T112119_N0511_R037_T29RLL_20250518T140519" \ + --collection "sentinel-2-l2a-dp-test" \ + --item-id "test-$(date +%s)" +``` + +## Step 5: Monitor Workflow + +```bash +# Watch latest workflow (5-7 min conversion time) +sleep 10 +kubectl get workflows -n devseed --sort-by=.metadata.creationTimestamp -o name | tail -1 | \ + xargs -I {} kubectl get {} -n devseed -w +``` + +**States:** Running (converting), Succeeded (done), Failed (check logs below) + +## Step 6: View Result + +```bash +# Use your item ID from Step 4 (e.g., test-1728315678) +ITEM_ID="YOUR_ITEM_ID" + +# View in browser +open "https://api.explorer.eopf.copernicus.eu/raster/viewer?url=https://api.explorer.eopf.copernicus.eu/stac/collections/sentinel-2-l2a-dp-test/items/${ITEM_ID}" + +# Or get STAC metadata +curl "https://api.explorer.eopf.copernicus.eu/stac/collections/sentinel-2-l2a-dp-test/items/${ITEM_ID}" | jq . +``` + +## Next Steps + +**Batch processing:** +```bash +curl "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items?limit=10" | \ + jq -r '.features[].id' | \ + xargs -I {} uv run python examples/submit.py --stac-url "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/{}" --collection "sentinel-2-l2a-dp-test" +``` + +**Jupyter notebook:** `jupyter lab notebooks/operator.ipynb` for interactive operations. + +**Custom payloads:** Edit `workflows/payload.json` (groups, spatial_chunk, tile_width), then `--payload workflows/payload.json`. + +## Troubleshooting + +**Workflow failed:** Check logs: +```bash +WORKFLOW=$(kubectl get workflows -n devseed --sort-by=.metadata.creationTimestamp -o name | tail -1) +kubectl logs -n devseed -l workflows.argoproj.io/workflow=$(basename $WORKFLOW) --tail=100 +``` + +**No workflow created:** Check sensor/eventsource: +```bash +kubectl logs -n devseed -l sensor-name=geozarr-sensor --tail=50 +``` + +**Connection issues:** Ensure port-forward is running: `kubectl port-forward -n core svc/rabbitmq 5672:5672 &` + +## Advanced + +**Monitor all workflows:** +```bash +watch -n 2 'kubectl get workflows -n devseed --sort-by=.metadata.creationTimestamp | tail -20' +``` + +**Cleanup succeeded workflows (>7 days):** +```bash +kubectl delete workflows -n devseed --field-selector=status.phase=Succeeded \ + $(kubectl get workflows -n devseed -o json | jq -r '.items[] | select(.metadata.creationTimestamp | fromdateiso8601 < (now - 604800)) | .metadata.name') +``` + +## Architecture + +``` +submit.py → RabbitMQ → Sensor → Argo Workflow (convert → register → augment) → S3 + STAC +``` + +**Components:** +- STAC Item: Satellite metadata (JSON) +- GeoZarr: Cloud-optimized geospatial format +- AMQP: Message queue protocol +- Sensor: Event-driven workflow trigger + +**Resources:** +- Docs: [README.md](README.md) +- Tools: [examples/README.md](examples/README.md) + +## Web UIs + +All bundled in EOxHub workspace: **https://workspace.devseed.hub-eopf-explorer.eox.at/** + +**Login to EOxHub for authenticated access to:** +- Argo Workflows: Monitor pipeline execution +- STAC Browser: Catalog exploration + +**Direct URLs (login through EOxHub first):** +- Argo UI: https://argo-workflows.hub-eopf-explorer.eox.at +- STAC API: https://api.explorer.eopf.copernicus.eu/stac +- Raster API: https://api.explorer.eopf.copernicus.eu/raster diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d0381d6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,176 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS diff --git a/QUICKSTART_E2E.md b/QUICKSTART_E2E.md new file mode 100644 index 0000000..1c69b32 --- /dev/null +++ b/QUICKSTART_E2E.md @@ -0,0 +1,178 @@ +# Quick Start: End-to-End GeoZarr Pipeline + +Complete a full GeoZarr conversion from STAC item to interactive web map in ~10 minutes. + +## Prerequisites + +- Kubernetes cluster with data-pipeline deployed +- kubectl configured with proper context +- Python 3.11+ with `pika` and `click` installed: + ```bash + pip install pika click + # OR if using uv in the repo: + cd data-pipeline && uv sync + ``` + +## One-Command Test + +```bash +# Port-forward RabbitMQ, publish message, and monitor +export RABBITMQ_PASSWORD=$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d) +kubectl port-forward -n core svc/rabbitmq 5672:5672 >/dev/null 2>&1 & +sleep 2 + +# Submit job +ITEM_ID="quickstart-test-$(date +%s)" +python3 examples/submit.py \ + --stac-url "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2C_MSIL2A_20251007T125311_N0511_R138_T27WXN_20251007T141722" \ + --item-id "$ITEM_ID" \ + --collection "sentinel-2-l2a-dp-test" \ + --amqp-url "amqp://user:${RABBITMQ_PASSWORD}@localhost:5672/" + +# Get workflow (wait 10s for sensor to trigger) +sleep 10 +WORKFLOW=$(kubectl get workflows -n devseed --sort-by=.metadata.creationTimestamp -o name | tail -1 | cut -d'/' -f2) +echo "✅ Workflow: $WORKFLOW" +echo "🔗 Argo UI: https://argo-workflows.hub-eopf-explorer.eox.at/workflows/devseed/$WORKFLOW" + +# Monitor (workflow takes ~5-10 minutes) +kubectl get workflow $WORKFLOW -n devseed -w +``` + +## Step-by-Step Guide + +## Step-by-Step Guide + +### 1. Verify Infrastructure + +```bash +kubectl get eventsource rabbitmq-geozarr -n devseed +kubectl get sensor geozarr-sensor -n devseed +kubectl get workflowtemplate geozarr-pipeline -n devseed +``` + +All should exist without errors (AGE column shows they're deployed). + +### 2. Publish AMQP Message + +```bash +# Get RabbitMQ password +export RABBITMQ_PASSWORD=$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d) + +# Port-forward RabbitMQ +kubectl port-forward -n core svc/rabbitmq 5672:5672 & + +# Submit job with unique ID +ITEM_ID="test-$(date +%s)" +python3 examples/submit.py \ + --stac-url "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2C_MSIL2A_20251007T125311_N0511_R138_T27WXN_20251007T141722" \ + --item-id "$ITEM_ID" \ + --collection "sentinel-2-l2a-dp-test" \ + --amqp-url "amqp://user:${RABBITMQ_PASSWORD}@localhost:5672/" + +echo "Submitted with item_id: $ITEM_ID" +``` + +### 3. Find Workflow + +Wait 10 seconds for sensor to trigger, then get workflow: + +```bash +sleep 10 +WORKFLOW=$(kubectl get workflows -n devseed --sort-by=.metadata.creationTimestamp -o name | tail -1 | cut -d'/' -f2) +echo "Workflow: $WORKFLOW" + +# Verify it was created by sensor (should show operate-workflow-sa) +kubectl get workflow $WORKFLOW -n devseed -o jsonpath='{.metadata.labels.workflows\.argoproj\.io/creator}' +``` + +### 4. Monitor Execution + +**Watch workflow status:** +```bash +kubectl get workflow $WORKFLOW -n devseed -w +``` + +**Check step progress:** +```bash +kubectl get workflow $WORKFLOW -n devseed -o jsonpath='{.status.nodes}' | \ + jq -r 'to_entries[] | "\(.value.displayName)\t\(.value.phase)"' | column -t +``` + +**View logs (once pods are running):** +```bash +# All steps +kubectl logs -n devseed -l workflows.argoproj.io/workflow=$WORKFLOW -f --prefix + +# Convert step only +kubectl logs -n devseed -l workflows.argoproj.io/workflow=$WORKFLOW,workflows.argoproj.io/template=convert-geozarr -c main -f +``` + +### 5. Verify Results + +**Wait for completion** (5-10 minutes): +```bash +kubectl wait --for=condition=Completed --timeout=15m workflow/$WORKFLOW -n devseed +``` + +**Check STAC registration:** +```bash +ITEM_ID=$(kubectl get workflow $WORKFLOW -n devseed -o jsonpath='{.spec.arguments.parameters[?(@.name=="item_id")].value}') + +curl -s "https://api.explorer.eopf.copernicus.eu/stac/collections/sentinel-2-l2a-dp-test/items/$ITEM_ID" | jq '{ + id: .id, + assets: (.assets | length), + viewer: [.links[] | select(.rel=="viewer") | .href][0] +}' +``` + +## Argo UI + +View in browser: +``` +https://argo-workflows.hub-eopf-explorer.eox.at/workflows/devseed/ +``` + +Workflows created via AMQP → Sensor are visible (sensor uses service account authentication). + +See [docs/ARGO_UI_VISIBILITY.md](docs/ARGO_UI_VISIBILITY.md) for details. + +## Workflow Steps + +The pipeline executes three steps: + +1. **convert-geozarr** - Convert Zarr to GeoZarr with tiling (~5 min) +2. **register-stac** - Register as STAC item (~30 sec) +3. **augment-stac** - Add viewer/XYZ/TileJSON links (~10 sec) + +## Troubleshooting + +**Workflow not created:** +```bash +# Check sensor logs +kubectl logs -n devseed -l sensor-name=geozarr-sensor --tail=50 + +# Check EventSource +kubectl logs -n devseed -l eventsource-name=rabbitmq-geozarr --tail=50 +``` + +**Workflow failed:** +```bash +# Get error details +kubectl describe workflow $WORKFLOW -n devseed + +# Check pod logs +kubectl logs -n devseed -l workflows.argoproj.io/workflow=$WORKFLOW --tail=200 +``` + +**STAC item not found:** +- Verify workflow succeeded: `kubectl get workflow $WORKFLOW -n devseed` +- Check register step logs +- Confirm collection exists: `curl -s https://api.explorer.eopf.copernicus.eu/stac/collections/sentinel-2-l2a-dp-test` + +## Success Criteria + +✅ Workflow Status: Succeeded +✅ All 3 steps completed +✅ STAC item has 20+ assets +✅ Viewer, XYZ, TileJSON links present diff --git a/README.md b/README.md index 80a5617..4fdaade 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,37 @@ -# Data Pipeline# EOPF GeoZarr Data Pipeline +# EOPF GeoZarr Data Pipeline +Automated pipeline for converting Sentinel-2 Zarr datasets to cloud-optimized GeoZarr format with STAC catalog integration and interactive visualization. +## Quick Reference -GeoZarr conversion pipeline for EOPF data processing.Automated pipeline for converting Sentinel-2 Zarr datasets to cloud-optimized GeoZarr format with STAC catalog integration and interactive visualization. +```bash +# 1. Submit a workflow (simplest method) +uv run python examples/submit.py --stac-url "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2B_..." +# 2. Monitor progress +kubectl get wf -n devseed -w +# 3. View result +# Check logs for viewer URL: https://api.explorer.eopf.copernicus.eu/raster/viewer?url=... +``` -## Features[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE) +💡 **Local testing:** Port-forward RabbitMQ first: `kubectl port-forward -n core svc/rabbitmq 5672:5672 &` -- STAC item registration with retry logic[![Python](https://img.shields.io/badge/python-3.11+-blue.svg)](https://www.python.org/downloads/) +## Features -- GeoZarr format conversion[![Tests](https://github.com/EOPF-Explorer/data-pipeline/workflows/Tests/badge.svg)](https://github.com/EOPF-Explorer/data-pipeline/actions) +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE) +[![Python](https://img.shields.io/badge/python-3.11+-blue.svg)](https://www.python.org/downloads/) +[![Tests](https://github.com/EOPF-Explorer/data-pipeline/workflows/Tests/badge.svg)](https://github.com/EOPF-Explorer/data-pipeline/actions) +- STAC item registration with retry logic +- GeoZarr format conversion - Cloud-native workflows ## What It Does -## Development - -```bashTransforms Sentinel-2 satellite data into web-ready visualizations: +Transforms Sentinel-2 satellite data into web-ready visualizations: -uv sync --all-extras - -uv run pytest**Input:** STAC item URL → **Output:** Interactive web map (~5-10 min) - -``` +**Input:** STAC item URL → **Output:** Interactive web map (~5-10 min) **Pipeline:** Convert (5 min) → Register (30 sec) → Augment (10 sec) @@ -96,6 +103,28 @@ STAC URL → submit.py → RabbitMQ → AMQP Sensor → Argo Workflow **Automation:** New Sentinel-2 data publishes to RabbitMQ → Pipeline runs automatically +## Submitting Workflows + +**Choose your approach:** + +| Method | Best For | Documentation | +|--------|----------|---------------| +| 🎯 **CLI tool** | Quick testing, automation | [examples/README.md](examples/README.md) | +| 📓 **Jupyter notebook** | Learning, exploration | [notebooks/README.md](notebooks/README.md) | +| ⚡ **Event-driven** | Production (auto) | Already running! | +| 🔧 **Custom pika** | Custom integrations | [See Configuration](#configuration) | + +**Quick example:** +```bash +uv run python examples/submit.py --stac-url "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2B_..." +``` + +**Monitor:** +```bash +kubectl get wf -n devseed -w # Watch workflows +kubectl logs -n devseed -l sensor-name=geozarr-sensor --tail=50 # Sensor logs +``` + ### Related Projects - **[data-model](https://github.com/EOPF-Explorer/data-model)** - `eopf-geozarr` conversion library (Python) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5be0a47 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,62 @@ +"""Pytest configuration and shared fixtures for data-pipeline tests.""" + +import pytest + + +@pytest.fixture +def sample_stac_item(): + """Return a minimal STAC item for testing.""" + return { + "type": "Feature", + "stac_version": "1.0.0", + "id": "test-item", + "properties": { + "datetime": "2025-01-01T00:00:00Z", + "proj:epsg": 32636, + }, + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [600000, 6290220], + [709800, 6290220], + [709800, 6400020], + [600000, 6400020], + [600000, 6290220], + ] + ], + }, + "links": [], + "assets": { + "B01": { + "href": "s3://bucket/data/B01.tif", + "type": "image/tiff; application=geotiff", + "roles": ["data"], + "proj:epsg": 32636, + "proj:shape": [10980, 10980], + "proj:transform": [10, 0, 600000, 0, -10, 6400020], + } + }, + "collection": "test-collection", + } + + +@pytest.fixture +def stac_item_with_proj_code(sample_stac_item): + """Return a STAC item with proj:code (should be removed).""" + item = sample_stac_item.copy() + item["properties"]["proj:code"] = "EPSG:32636" + item["assets"]["B01"]["proj:code"] = "EPSG:32636" + return item + + +@pytest.fixture +def mock_zarr_url(): + """Return a sample GeoZarr URL.""" + return "s3://bucket/path/to/dataset.zarr" + + +@pytest.fixture +def mock_stac_api_url(): + """Return a mock STAC API URL.""" + return "https://api.example.com/stac" diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..75e32a2 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1 @@ +"""Integration tests for data-pipeline.""" diff --git a/tests/integration/test_pipeline_e2e.py b/tests/integration/test_pipeline_e2e.py new file mode 100644 index 0000000..d31b243 --- /dev/null +++ b/tests/integration/test_pipeline_e2e.py @@ -0,0 +1,228 @@ +"""Integration tests for end-to-end pipeline flow. + +Tests the full workflow: +1. Extract metadata from source Zarr +2. Register GeoZarr to STAC API +3. Augment item with preview links +4. Validate final STAC item +""" + +import os +from unittest.mock import Mock, patch + +import httpx +import pytest + + +@pytest.fixture +def mock_stac_api_responses(): + """Mock STAC API responses for integration tests.""" + return { + "post_item": { + "id": "test-item-123", + "collection": "sentinel-2-l2a", + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": [[]]}, + "properties": {"datetime": "2025-01-01T00:00:00Z"}, + "assets": {"eopf:zarr": {"href": "s3://bucket/test.zarr"}}, + "links": [], + }, + "get_item": { + "id": "test-item-123", + "collection": "sentinel-2-l2a", + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": [[]]}, + "properties": {"datetime": "2025-01-01T00:00:00Z", "proj:epsg": 32636}, + "assets": { + "eopf:zarr": {"href": "s3://bucket/test.zarr", "roles": ["data"]}, + "visual": { + "href": "https://example.com/cog.tif", + "type": "image/tiff", + "roles": ["visual"], + }, + }, + "links": [], + }, + "patch_item": { + "id": "test-item-123", + "collection": "sentinel-2-l2a", + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": [[]]}, + "properties": {"datetime": "2025-01-01T00:00:00Z", "proj:epsg": 32636}, + "assets": { + "eopf:zarr": {"href": "s3://bucket/test.zarr", "roles": ["data"]}, + "visual": { + "href": "https://example.com/cog.tif", + "type": "image/tiff", + "roles": ["visual", "overview"], + }, + }, + "links": [ + { + "rel": "xyz", + "href": "https://titiler.example.com/tiles/...", + "type": "application/json", + } + ], + }, + } + + +@pytest.mark.integration +def test_full_pipeline_flow(sample_stac_item, mock_stac_api_responses): + """Test complete pipeline: extract → register → augment → validate.""" + from scripts.register_stac import create_geozarr_item, register_item + + # Step 1: Create GeoZarr STAC item + geozarr_item = create_geozarr_item( + source_item=sample_stac_item, + geozarr_url="s3://bucket/output.zarr", + item_id="test-item-123", + collection_id="sentinel-2-l2a", + s3_endpoint="https://s3.example.com", + ) + + assert geozarr_item["id"] == "test-item-123" + assert geozarr_item["collection"] == "sentinel-2-l2a" + # Verify assets rewritten (not eopf:zarr, but existing band assets) + assert "assets" in geozarr_item + assert len(geozarr_item["assets"]) > 0 + + # Step 2: Mock register to STAC API + with patch("httpx.Client") as mock_client: + mock_response_get = Mock(status_code=404) + mock_response_post = Mock( + status_code=201, + json=lambda: mock_stac_api_responses["post_item"], + ) + + mock_client_instance = Mock() + mock_client_instance.get.return_value = mock_response_get + mock_client_instance.post.return_value = mock_response_post + mock_client_instance.__enter__ = Mock(return_value=mock_client_instance) + mock_client_instance.__exit__ = Mock(return_value=False) + mock_client.return_value = mock_client_instance + + register_item( + stac_url="https://stac.example.com", + collection_id="sentinel-2-l2a", + item=geozarr_item, + mode="create-or-skip", + ) + + assert mock_client_instance.post.called + post_args = mock_client_instance.post.call_args + assert "sentinel-2-l2a/items" in str(post_args) + + # Step 3: Verify item structure ready for augmentation + # (Augmentation happens via CLI script in real pipeline) + # Band assets should be rewritten to GeoZarr location + for asset in geozarr_item["assets"].values(): + if isinstance(asset, dict) and "href" in asset: + assert asset["href"].startswith("https://") or asset["href"].startswith("s3://") + # Verify roles exist + assert "roles" in asset + + +@pytest.mark.integration +def test_registration_error_handling(): + """Test error handling during STAC registration.""" + from scripts.register_stac import register_item + + test_item = { + "id": "test", + "collection": "test-collection", + "type": "Feature", + "geometry": None, + "properties": {}, + "assets": {}, + } + + with patch("httpx.Client") as mock_client: + mock_response_get = Mock(status_code=404) + mock_response_post = Mock(status_code=400, text="Bad Request") + mock_response_post.raise_for_status = Mock( + side_effect=httpx.HTTPStatusError( + "Bad Request", request=Mock(), response=mock_response_post + ) + ) + + mock_client_instance = Mock() + mock_client_instance.get.return_value = mock_response_get + mock_client_instance.post.return_value = mock_response_post + mock_client_instance.__enter__ = Mock(return_value=mock_client_instance) + mock_client_instance.__exit__ = Mock(return_value=False) + mock_client.return_value = mock_client_instance + + with pytest.raises(httpx.HTTPStatusError): + register_item( + stac_url="https://stac.example.com", + collection_id="test-collection", + item=test_item, + mode="create-or-skip", + ) + + +@pytest.mark.integration +@pytest.mark.skipif( + not os.getenv("STAC_API_URL"), reason="Requires real STAC API (set STAC_API_URL)" +) +def test_real_stac_api_connection(): + """Test actual connection to STAC API (optional, requires credentials).""" + import httpx + + stac_api_url = os.getenv("STAC_API_URL") + + # Test GET /collections + response = httpx.get(f"{stac_api_url}/collections", timeout=10.0) + assert response.status_code == 200 + + collections = response.json() + assert "collections" in collections or isinstance(collections, list) + + +@pytest.mark.integration +def test_pipeline_with_s3_urls(): + """Test pipeline handles S3 URLs correctly.""" + from scripts.register_stac import create_geozarr_item, s3_to_https + + # Test S3 URL conversion + s3_url = "s3://eopf-bucket/geozarr/S2A_test.zarr" + https_url = s3_to_https(s3_url, "https://s3.gra.cloud.ovh.net") + + assert https_url.startswith("https://") + assert "eopf-bucket" in https_url + assert "s3.gra.cloud.ovh.net" in https_url + + # Test item with zarr base URL (source → output rewriting) + source_item = { + "type": "Feature", + "id": "test-source", + "properties": {"datetime": "2025-01-01T00:00:00Z"}, + "geometry": None, + "collection": "test", + "assets": { + "B01": { + "href": "s3://source-bucket/data.zarr/B01.tif", + "type": "image/tiff", + "roles": ["data"], + } + }, + } + + item = create_geozarr_item( + source_item=source_item, + geozarr_url=s3_url, + item_id="test-s3-item", + collection_id="sentinel-2-l2a", + s3_endpoint="https://s3.gra.cloud.ovh.net", + ) + + # Verify asset hrefs rewritten from source .zarr to output .zarr + for asset in item["assets"].values(): + if isinstance(asset, dict) and "href" in asset: + # Should reference output geozarr location + assert "eopf-bucket" in asset["href"] or asset["href"].startswith("s3://source") + # If rewritten, should be HTTPS + if "eopf-bucket" in asset["href"]: + assert asset["href"].startswith("https://") diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..35c2daf --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1 @@ +"""Unit tests for data-pipeline.""" diff --git a/tests/unit/test_augment_stac_item.py b/tests/unit/test_augment_stac_item.py new file mode 100644 index 0000000..76ba75a --- /dev/null +++ b/tests/unit/test_augment_stac_item.py @@ -0,0 +1,302 @@ +"""Unit tests for augment_stac_item.py.""" + + +def test_encode_true_color_query(): + """Test true color query string encoding.""" + from scripts.augment_stac_item import _encode_true_color_query + + result = _encode_true_color_query("0,0.1") + + # Should include all 3 bands (URL encoded) + assert "variables=%2Fmeasurements%2Freflectance%2Fr10m%2F0%3Ab04" in result + assert "variables=%2Fmeasurements%2Freflectance%2Fr10m%2F0%3Ab03" in result + assert "variables=%2Fmeasurements%2Freflectance%2Fr10m%2F0%3Ab02" in result + assert "rescale=0%2C0.1" in result + assert "color_formula=Gamma+RGB+1.4" in result + + +def test_encode_quicklook_query(): + """Test quicklook query string encoding.""" + from scripts.augment_stac_item import _encode_quicklook_query + + result = _encode_quicklook_query() + + # Should reference TCI (URL encoded) + assert "variables=%2Fquality%2Fl2a_quicklook%2Fr10m%3Atci" in result + assert "bidx=1" in result + assert "bidx=2" in result + assert "bidx=3" in result + + +def test_coerce_epsg_from_string(): + """Test EPSG code coercion from string.""" + from scripts.augment_stac_item import _coerce_epsg + + assert _coerce_epsg("32636") == 32636 + assert _coerce_epsg("EPSG:32636") == 32636 + assert _coerce_epsg("epsg:32636") == 32636 + + +def test_coerce_epsg_invalid(): + """Test EPSG code coercion returns None for invalid input.""" + from scripts.augment_stac_item import _coerce_epsg + + assert _coerce_epsg(None) is None + assert _coerce_epsg("") is None + assert _coerce_epsg("invalid") is None + assert _coerce_epsg(True) is None + + +def test_resolve_preview_query_default(): + """Test preview query resolution uses default when env is None.""" + from scripts.augment_stac_item import _resolve_preview_query + + result = _resolve_preview_query(None, default_query="default") + assert result == "default" + + +def test_resolve_preview_query_custom(): + """Test preview query resolution uses env value when provided.""" + from scripts.augment_stac_item import _resolve_preview_query + + result = _resolve_preview_query("custom=value", default_query="default") + assert result == "custom=value" + + +def test_resolve_preview_query_strips_whitespace(): + """Test preview query resolution strips whitespace.""" + from scripts.augment_stac_item import _resolve_preview_query + + result = _resolve_preview_query(" custom=value ", default_query="default") + assert result == "custom=value" + + +def test_normalize_collection_slug(): + """Test collection ID normalization.""" + from scripts.augment_stac_item import _normalize_collection_slug + + assert _normalize_collection_slug("sentinel-2-l2a") == "sentinel-2-l2a" + assert _normalize_collection_slug("Sentinel 2 L2A") == "sentinel 2 l2a" + assert _normalize_collection_slug("SENTINEL_2_L2A") == "sentinel_2_l2a" + + +def test_normalize_href_scheme_s3_passthrough(): + """Test that s3:// URLs pass through unchanged.""" + from scripts.augment_stac_item import normalize_href_scheme + + assert normalize_href_scheme("s3://mybucket/data.zarr") == "s3://mybucket/data.zarr" + + +def test_normalize_href_scheme_ovh_s3_subdomain(): + """Test OVH S3 virtual-hosted style URL normalization.""" + from scripts.augment_stac_item import normalize_href_scheme + + result = normalize_href_scheme("https://mybucket.s3.gra.cloud.ovh.net/path/to/data.zarr") + assert result == "s3://mybucket/path/to/data.zarr" + + +def test_normalize_href_scheme_ovh_s3_path_style(): + """Test OVH S3 path-style URL normalization.""" + from scripts.augment_stac_item import normalize_href_scheme + + result = normalize_href_scheme("https://s3.gra.cloud.ovh.net/mybucket/path/to/data.zarr") + assert result == "s3://mybucket/path/to/data.zarr" + + +def test_normalize_href_scheme_ovh_io_subdomain(): + """Test OVH IO Cloud virtual-hosted style URL normalization.""" + from scripts.augment_stac_item import normalize_href_scheme + + result = normalize_href_scheme("https://mybucket.s3.io.cloud.ovh.net/data.zarr") + assert result == "s3://mybucket/data.zarr" + + +def test_normalize_href_scheme_non_ovh_unchanged(): + """Test non-OVH URLs remain unchanged.""" + from scripts.augment_stac_item import normalize_href_scheme + + url = "https://example.com/data.zarr" + assert normalize_href_scheme(url) == url + + +def test_normalize_href_scheme_invalid_scheme(): + """Test non-http(s) schemes remain unchanged.""" + from scripts.augment_stac_item import normalize_href_scheme + + ftp_url = "ftp://example.com/data.zarr" + assert normalize_href_scheme(ftp_url) == ftp_url + + +def test_resolve_preview_asset_href_converts_preview(): + """Test preview path resolution to full-resolution dataset.""" + from scripts.augment_stac_item import resolve_preview_asset_href + + preview = "s3://bucket/previews/S2B_MSIL2A_20250518_preview.zarr/measurements/b04" + result = resolve_preview_asset_href(preview) + assert result == "s3://bucket/sentinel-2-l2a/S2B_MSIL2A_20250518.zarr/measurements/b04" + + +def test_resolve_preview_asset_href_passthrough_full_res(): + """Test full-resolution paths remain unchanged.""" + from scripts.augment_stac_item import resolve_preview_asset_href + + full = "s3://bucket/sentinel-2-l2a/S2B_MSIL2A_20250518.zarr/measurements/b04" + assert resolve_preview_asset_href(full) == full + + +def test_resolve_preview_asset_href_passthrough_no_preview_suffix(): + """Test paths in previews directory without _preview.zarr suffix remain unchanged.""" + from scripts.augment_stac_item import resolve_preview_asset_href + + no_suffix = "s3://bucket/previews/S2B_MSIL2A_20250518.zarr/data" + assert resolve_preview_asset_href(no_suffix) == no_suffix + + +def test_resolve_preview_asset_href_passthrough_non_s3(): + """Test non-S3 URLs remain unchanged.""" + from scripts.augment_stac_item import resolve_preview_asset_href + + https_url = "https://example.com/previews/data_preview.zarr/b04" + assert resolve_preview_asset_href(https_url) == https_url + + +def test_resolve_preview_asset_href_malformed_path(): + """Test malformed preview paths return original href.""" + from scripts.augment_stac_item import resolve_preview_asset_href + + # Missing store name after previews/ + malformed = "s3://bucket/previews/" + assert resolve_preview_asset_href(malformed) == malformed + + +def test_normalize_asset_alternate_schemes_normalizes_s3(): + """Test alternate hrefs are normalized to s3:// scheme.""" + from pystac import Asset + + from scripts.augment_stac_item import normalize_asset_alternate_schemes + + asset = Asset( + href="s3://bucket/data.zarr", + extra_fields={ + "alternate": { + "s3": {"href": "https://bucket.s3.gra.io.cloud.ovh.net/data.zarr"}, + "https": {"href": "https://example.com/data.zarr"}, + } + }, + ) + + normalize_asset_alternate_schemes(asset) + + alternates = asset.extra_fields.get("alternate", {}) + assert alternates["s3"]["href"] == "s3://bucket/data.zarr" + assert alternates["https"]["href"] == "https://example.com/data.zarr" + + +def test_normalize_asset_alternate_schemes_resolves_previews(): + """Test alternate preview paths are resolved to full datasets.""" + from pystac import Asset + + from scripts.augment_stac_item import normalize_asset_alternate_schemes + + asset = Asset( + href="s3://bucket/sentinel-2-l2a/data.zarr", + extra_fields={ + "alternate": { + "s3": {"href": "s3://bucket/previews/data_preview.zarr"}, + } + }, + ) + + normalize_asset_alternate_schemes(asset) + + alternates = asset.extra_fields.get("alternate", {}) + assert alternates["s3"]["href"] == "s3://bucket/sentinel-2-l2a/data.zarr" + + +def test_normalize_asset_alternate_schemes_removes_empty(): + """Test empty alternates are removed after normalization.""" + from pystac import Asset + + from scripts.augment_stac_item import normalize_asset_alternate_schemes + + # Start with empty dict + asset = Asset(href="s3://bucket/data.zarr", extra_fields={"alternate": {}}) + + normalize_asset_alternate_schemes(asset) + + assert "alternate" not in asset.extra_fields + + +def test_normalize_asset_alternate_schemes_no_extra_fields(): + """Test assets without extra_fields are handled safely.""" + from pystac import Asset + + from scripts.augment_stac_item import normalize_asset_alternate_schemes + + asset = Asset(href="s3://bucket/data.zarr") + + # Should not raise + normalize_asset_alternate_schemes(asset) + + assert asset.extra_fields == {} + + +def test_normalize_asset_alternate_schemes_invalid_alternate_type(): + """Test non-dict alternate values are skipped.""" + from pystac import Asset + + from scripts.augment_stac_item import normalize_asset_alternate_schemes + + asset = Asset(href="s3://bucket/data.zarr", extra_fields={"alternate": "invalid"}) + + normalize_asset_alternate_schemes(asset) + + # Invalid type is left unchanged + assert asset.extra_fields.get("alternate") == "invalid" + + +def test_normalize_asset_alternate_schemes_missing_href(): + """Test alternate entries without href are skipped.""" + from pystac import Asset + + from scripts.augment_stac_item import normalize_asset_alternate_schemes + + asset = Asset( + href="s3://bucket/data.zarr", + extra_fields={ + "alternate": { + "s3": {"title": "S3 access"}, # no href + "https": {"href": "https://example.com/data.zarr"}, + } + }, + ) + + normalize_asset_alternate_schemes(asset) + + alternates = asset.extra_fields.get("alternate", {}) + # Entry without href is unchanged + assert alternates["s3"] == {"title": "S3 access"} + # Entry with href is normalized (unchanged in this case) + assert alternates["https"]["href"] == "https://example.com/data.zarr" + + +def test_normalize_asset_alternate_schemes_combined_transformations(): + """Test both normalization and preview resolution work together.""" + from pystac import Asset + + from scripts.augment_stac_item import normalize_asset_alternate_schemes + + asset = Asset( + href="s3://bucket/sentinel-2-l2a/data.zarr", + extra_fields={ + "alternate": { + "s3": {"href": "https://bucket.s3.gra.io.cloud.ovh.net/previews/data_preview.zarr"}, + } + }, + ) + + normalize_asset_alternate_schemes(asset) + + alternates = asset.extra_fields.get("alternate", {}) + # Should be normalized from HTTPS AND resolved from preview + assert alternates["s3"]["href"] == "s3://bucket/sentinel-2-l2a/data.zarr" diff --git a/tests/unit/test_register_stac.py b/tests/unit/test_register_stac.py new file mode 100644 index 0000000..513f491 --- /dev/null +++ b/tests/unit/test_register_stac.py @@ -0,0 +1,295 @@ +"""Unit tests for register_stac.py.""" + + +def test_remove_proj_code_from_properties(stac_item_with_proj_code): + """Test that proj:code is removed from item properties.""" + from scripts.register_stac import create_geozarr_item + + # Mock minimal inputs + item = create_geozarr_item( + source_item=stac_item_with_proj_code, + geozarr_url="s3://bucket/output.zarr", + item_id=None, + collection_id=None, + s3_endpoint="https://s3.example.com", + ) + + # Verify proj:code removed from properties + assert "proj:code" not in item["properties"] + # But proj:epsg should remain + assert "proj:epsg" in item["properties"] + + +def test_remove_proj_epsg_from_assets(stac_item_with_proj_code): + """Test that proj:epsg and proj:code are removed from assets.""" + from scripts.register_stac import create_geozarr_item + + item = create_geozarr_item( + source_item=stac_item_with_proj_code, + geozarr_url="s3://bucket/output.zarr", + item_id=None, + collection_id=None, + s3_endpoint="https://s3.example.com", + ) + + # Check all assets have NO proj:epsg or proj:code + for asset_key, asset_value in item["assets"].items(): + assert "proj:epsg" not in asset_value, f"Asset {asset_key} has proj:epsg" + assert "proj:code" not in asset_value, f"Asset {asset_key} has proj:code" + + +def test_remove_storage_options_from_assets(sample_stac_item): + """Test that storage:options is removed from assets.""" + from scripts.register_stac import create_geozarr_item + + # Add storage:options to source item + source = sample_stac_item.copy() + source["assets"]["B01"]["storage:options"] = {"anon": True} + + item = create_geozarr_item( + source_item=source, + geozarr_url="s3://bucket/output.zarr", + item_id=None, + collection_id=None, + s3_endpoint="https://s3.example.com", + ) + + # Verify storage:options removed + for asset_value in item["assets"].values(): + assert "storage:options" not in asset_value + + +def test_s3_to_https_conversion(): + """Test S3 URL to HTTPS conversion.""" + from scripts.register_stac import s3_to_https + + result = s3_to_https("s3://mybucket/path/to/file.zarr", "https://s3.example.com") + assert result == "https://mybucket.s3.example.com/path/to/file.zarr" + + +def test_derived_from_link_added(sample_stac_item): + """Test that derived_from link is added.""" + from scripts.register_stac import create_geozarr_item + + # Add self link to source + source = sample_stac_item.copy() + source["links"] = [ + { + "rel": "self", + "href": "https://api.example.com/items/test-item", + "type": "application/json", + } + ] + + item = create_geozarr_item( + source_item=source, + geozarr_url="s3://bucket/output.zarr", + item_id=None, + collection_id=None, + s3_endpoint="https://s3.example.com", + ) + + # Check derived_from link exists + derived_links = [link for link in item["links"] if link["rel"] == "derived_from"] + assert len(derived_links) == 1 + assert derived_links[0]["href"] == "https://api.example.com/items/test-item" + + +def test_r60m_overview_path_rewrite(): + """Test that r60m band assets get /0 inserted for overview level.""" + from scripts.register_stac import create_geozarr_item + + source = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "test", + "properties": {"datetime": "2025-01-01T00:00:00Z", "proj:epsg": 32636}, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "links": [], + "assets": { + "B01_60m": { + "href": "s3://bucket/source.zarr/r60m/b01", + "type": "image/tiff", + "roles": ["data"], + } + }, + "collection": "test", + } + + item = create_geozarr_item( + source_item=source, + geozarr_url="s3://bucket/output.zarr", + item_id=None, + collection_id=None, + s3_endpoint="https://s3.example.com", + ) + + # Verify /0 was inserted for r60m + assert "/r60m/0/b01" in item["assets"]["B01_60m"]["href"] + + +def test_r10m_no_overview_path(): + """Test that r10m/r20m bands do NOT get /0 inserted.""" + from scripts.register_stac import create_geozarr_item + + source = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "test", + "properties": {"datetime": "2025-01-01T00:00:00Z", "proj:epsg": 32636}, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "links": [], + "assets": { + "B02_10m": { + "href": "s3://bucket/source.zarr/r10m/b02", + "type": "image/tiff", + "roles": ["data"], + } + }, + "collection": "test", + } + + item = create_geozarr_item( + source_item=source, + geozarr_url="s3://bucket/output.zarr", + item_id=None, + collection_id=None, + s3_endpoint="https://s3.example.com", + ) + + # Verify NO /0 for r10m + assert "/r10m/b02" in item["assets"]["B02_10m"]["href"] + assert "/0/" not in item["assets"]["B02_10m"]["href"] + + +def test_keep_proj_spatial_fields_on_assets(sample_stac_item): + """Test that proj:bbox, proj:shape, proj:transform are kept on assets.""" + from scripts.register_stac import create_geozarr_item + + # Add spatial fields to source asset + source = sample_stac_item.copy() + source["assets"]["B01"]["proj:bbox"] = [600000, 6290220, 709800, 6400020] + source["assets"]["B01"]["proj:shape"] = [10980, 10980] + source["assets"]["B01"]["proj:transform"] = [10, 0, 600000, 0, -10, 6400020] + + item = create_geozarr_item( + source_item=source, + geozarr_url="s3://bucket/output.zarr", + item_id=None, + collection_id=None, + s3_endpoint="https://s3.example.com", + ) + + # These should be preserved + asset = item["assets"]["B01"] + assert "proj:bbox" in asset + assert "proj:shape" in asset + assert "proj:transform" in asset + + +def test_normalize_asset_href_basic(): + """Test normalize_asset_href for simple r60m paths.""" + from scripts.register_stac import normalize_asset_href + + # Should insert /0 for r60m bands + result = normalize_asset_href("s3://bucket/data.zarr/r60m/b01") + assert result == "s3://bucket/data.zarr/r60m/0/b01" + + result = normalize_asset_href("s3://bucket/data.zarr/r60m/b09") + assert result == "s3://bucket/data.zarr/r60m/0/b09" + + +def test_normalize_asset_href_complex_paths(): + """Test normalize_asset_href with complex base paths.""" + from scripts.register_stac import normalize_asset_href + + # Complex S3 path + result = normalize_asset_href( + "s3://eodc-sentinel-2/products/2025/S2A_MSIL2A.zarr/measurements/reflectance/r60m/b01" + ) + expected = ( + "s3://eodc-sentinel-2/products/2025/S2A_MSIL2A.zarr/measurements/reflectance/r60m/0/b01" + ) + assert result == expected + + # HTTPS path + result = normalize_asset_href("https://example.com/data.zarr/quality/r60m/scene_classification") + expected = "https://example.com/data.zarr/quality/r60m/0/scene_classification" + assert result == expected + + +def test_clean_stac_item_metadata(): + """Test cleaning invalid projection metadata from STAC item.""" + from scripts.register_stac import clean_stac_item_metadata + + item = { + "id": "test-item", + "properties": { + "datetime": "2025-01-01T00:00:00Z", + "proj:bbox": [0, 0, 100, 100], + "proj:epsg": 32632, + "proj:shape": [1024, 1024], + "proj:transform": [10, 0, 0, 0, -10, 0], + "proj:code": "EPSG:32632", + }, + "assets": { + "band1": { + "href": "s3://bucket/data.zarr/b01", + "proj:epsg": 32632, + "proj:code": "EPSG:32632", + "storage:options": {"anon": True}, + }, + "band2": { + "href": "s3://bucket/data.zarr/b02", + "proj:epsg": 32632, + }, + }, + } + + clean_stac_item_metadata(item) + + # Check properties cleaned + assert "proj:shape" not in item["properties"] + assert "proj:transform" not in item["properties"] + assert "proj:code" not in item["properties"] + assert "proj:bbox" in item["properties"] # Should be kept + assert "proj:epsg" in item["properties"] # Should be kept + + # Check assets cleaned + assert "proj:epsg" not in item["assets"]["band1"] + assert "proj:code" not in item["assets"]["band1"] + assert "storage:options" not in item["assets"]["band1"] + assert "href" in item["assets"]["band1"] # Should be kept + + assert "proj:epsg" not in item["assets"]["band2"] + assert "href" in item["assets"]["band2"] + + +def test_find_source_zarr_base(): + """Test extracting base Zarr URL from source item assets.""" + from scripts.register_stac import find_source_zarr_base + + # Test with .zarr/ in path + source_item = { + "assets": { + "product": {"href": "s3://bucket/data.zarr/measurements/b01"}, + "metadata": {"href": "https://example.com/metadata.json"}, + } + } + result = find_source_zarr_base(source_item) + assert result == "s3://bucket/data.zarr/" + + # Test with .zarr at end + source_item = {"assets": {"product": {"href": "s3://bucket/data.zarr"}}} + result = find_source_zarr_base(source_item) + assert result == "s3://bucket/data.zarr/" + + # Test with no zarr assets + source_item = {"assets": {"metadata": {"href": "https://example.com/metadata.json"}}} + result = find_source_zarr_base(source_item) + assert result is None + + # Test with no assets + source_item = {} + result = find_source_zarr_base(source_item) + assert result is None diff --git a/validate-setup.sh b/validate-setup.sh new file mode 100755 index 0000000..d97b6a6 --- /dev/null +++ b/validate-setup.sh @@ -0,0 +1,129 @@ +#!/bin/bash +# Validate data-pipeline setup +# Run this after following GETTING_STARTED.md to verify everything works + +set -euo pipefail + +NAMESPACE="${NAMESPACE:-devseed}" +PASS=0 +FAIL=0 + +echo "==========================================" +echo "🔍 Data Pipeline Setup Validation" +echo "==========================================" +echo "" + +# Function to check and report +check() { + local name="$1" + local command="$2" + + echo -n " Checking $name... " + if eval "$command" &>/dev/null; then + echo "✅" + ((PASS++)) + return 0 + else + echo "❌" + ((FAIL++)) + return 1 + fi +} + +# 1. kubectl access +echo "📋 Step 1: kubectl Configuration" +check "kubectl installed" "command -v kubectl" +check "KUBECONFIG set" "test -n \"\${KUBECONFIG:-}\"" +check "cluster access" "kubectl get nodes" +check "namespace exists" "kubectl get namespace $NAMESPACE" +echo "" + +# 2. Infrastructure deployed +echo "📋 Step 2: Pipeline Infrastructure" +check "RBAC (ServiceAccount)" "kubectl get serviceaccount operate-workflow-sa -n $NAMESPACE" +check "RBAC (Role)" "kubectl get role operate-workflow-creator -n $NAMESPACE" +check "RBAC (RoleBinding)" "kubectl get rolebinding operate-workflow-creator-binding -n $NAMESPACE" +check "EventSource" "kubectl get eventsource rabbitmq-geozarr -n $NAMESPACE" +check "Sensor" "kubectl get sensor geozarr-sensor -n $NAMESPACE" +check "WorkflowTemplate" "kubectl get workflowtemplate geozarr-pipeline -n $NAMESPACE" +echo "" + +# 3. Core services (from platform-deploy) +echo "📋 Step 3: Core Services" +check "RabbitMQ deployed" "kubectl get pods -n core -l app.kubernetes.io/name=rabbitmq | grep -q Running" +check "RabbitMQ secret exists" "kubectl get secret rabbitmq-password -n core" +check "Argo Workflows deployed" "kubectl get pods -n core -l app.kubernetes.io/name=argo-workflows-server | grep -q Running" +check "STAC API reachable" "curl -sf https://api.explorer.eopf.copernicus.eu/stac/ -o /dev/null" +check "Raster API reachable" "curl -sf https://api.explorer.eopf.copernicus.eu/raster/ -o /dev/null" +echo "" + +# 4. Python environment +echo "📋 Step 4: Python Environment" +check "Python 3.11+" "python3 -c 'import sys; sys.exit(0 if sys.version_info >= (3, 11) else 1)'" + +if command -v uv &>/dev/null; then + check "uv installed" "command -v uv" + check "dependencies synced" "test -f .venv/bin/python" +else + check "pip installed" "command -v pip" + check "pika installed" "python3 -c 'import pika'" + check "click installed" "python3 -c 'import click'" +fi +echo "" + +# 5. Sensor status (check if it's receiving messages) +echo "📋 Step 5: Event Processing" +SENSOR_POD=$(kubectl get pods -n $NAMESPACE -l sensor-name=geozarr-sensor -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "") +if [ -n "$SENSOR_POD" ]; then + check "Sensor pod running" "kubectl get pod $SENSOR_POD -n $NAMESPACE | grep -q Running" + + # Check if sensor has logged any activity (not critical) + if kubectl logs -n $NAMESPACE $SENSOR_POD --tail=10 2>/dev/null | grep -q "sensor"; then + echo " Sensor logs present... ✅" + ((PASS++)) + else + echo " Sensor logs empty (no jobs yet)... ⚠️ (not an error)" + fi +else + echo " Sensor pod not found... ❌" + ((FAIL++)) +fi +echo "" + +# Summary +echo "==========================================" +echo "📊 Validation Summary" +echo "==========================================" +echo "✅ Passed: $PASS" +echo "❌ Failed: $FAIL" +echo "" + +if [ $FAIL -eq 0 ]; then + echo "🎉 Setup complete! You're ready to submit jobs." + echo "" + echo "Next steps:" + echo " 1. Port-forward RabbitMQ:" + echo " kubectl port-forward -n core svc/rabbitmq 5672:5672 &" + echo "" + echo " 2. Get RabbitMQ password and submit:" + echo " export AMQP_URL=\"amqp://user:\$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d)@localhost:5672/\"" + echo " uv run python examples/submit.py \\" + echo " --stac-url \"https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2B_...\" \\" + echo " --collection \"sentinel-2-l2a-dp-test\"" + echo "" + echo " 3. Monitor:" + echo " kubectl get workflows -n devseed -w" + echo "" + exit 0 +else + echo "❌ Setup incomplete. Please fix the failed checks above." + echo "" + echo "Common fixes:" + echo " - Missing infrastructure: kubectl apply -f workflows/rbac.yaml -n $NAMESPACE" + echo " - No cluster access: Check KUBECONFIG points to valid file" + echo " - Platform services down: Check platform-deploy status" + echo "" + echo "See GETTING_STARTED.md for detailed setup instructions." + echo "" + exit 1 +fi