diff --git a/.github/workflows/deploy-unav-v2-modal.yml b/.github/workflows/deploy-unav-v2-modal.yml index 7f30b61..1cd0a76 100644 --- a/.github/workflows/deploy-unav-v2-modal.yml +++ b/.github/workflows/deploy-unav-v2-modal.yml @@ -17,6 +17,7 @@ on: - t4 - any - a100 + - h200 ram_mb: description: "RAM reservation in MiB (max in this workflow: 98304)" required: true @@ -58,8 +59,8 @@ jobs: if parsed <= 0: raise ValueError("scaledown_window must be a positive integer") gpu = os.environ["UNAV_GPU_TYPE"].strip().lower() - if gpu not in {"a10", "t4", "any", "a100"}: - raise ValueError("gpu_type must be one of: a10, t4, any, a100") + if gpu not in {"a10", "t4", "any", "a100", "h200"}: + raise ValueError("gpu_type must be one of: a10, t4, any, a100, h200") ram_mb = int(os.environ["UNAV_RAM_MB"]) max_ram_mb = 98304 if ram_mb <= 0: diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..42f3adb --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,197 @@ +# UNav-Server Agent Guidelines + +This document provides guidelines for agents working on the UNav-Server codebase. + +## Project Overview + +UNav-Server provides a serverless implementation for indoor navigation using computer vision. It leverages Modal for deployment and offers features like visual localization, path planning, and navigation guidance. + +## Project Structure + +``` +UNav-Server/ +├── src/ +│ └── modal_functions/ +│ ├── unav_v1/ # Legacy version +│ ├── unav_v2/ # Current production version +│ │ ├── unav_modal.py # Main Modal app (~200 lines) +│ │ ├── logic/ # Extracted business logic +│ │ │ ├── __init__.py # Exports all run_* functions +│ │ │ ├── navigation.py # run_planner, run_localize_user +│ │ │ ├── init.py # Initialization & monkey-patching +│ │ │ ├── places.py # run_get_places, run_get_fallback_places +│ │ │ ├── maps.py # run_ensure_maps_loaded +│ │ │ ├── utils.py # run_safe_serialize, etc. +│ │ │ └── vlm.py # run_vlm_on_image +│ │ ├── server_methods/ +│ │ │ └── helpers.py # Queue utility functions +│ │ ├── test_modal_functions.py +│ │ ├── modal_config.py +│ │ ├── deploy_config.py +│ │ ├── destinations_service.py +│ │ └── media/ # Test images +│ └── volume_utils/ # Volume management utilities +├── .github/workflows/ # CI/CD workflows +├── requirements.txt # Python dependencies +└── TODO.md # Technical documentation +``` + +## Build/Lint/Test Commands + +### Python Version +- Minimum: Python 3.10+ +- Recommended: Python 3.11 (used in CI/CD) + +### Setup +```bash +# Create virtual environment +python -m venv .venv + +# Activate (macOS/Linux) +source .venv/bin/activate + +# Install dependencies +pip install -r requirements.txt +``` + +### Running Tests +```bash +# Navigate to the module directory +cd src/modal_functions/unav_v2 + +# Run a single test file +python test_modal_functions.py + +# Run with pytest (if installed) +pytest test_modal_functions.py -v +``` + +### Deployment Commands +```bash +# Deploy to Modal (from unav_v2 directory) +cd src/modal_functions/unav_v2 +modal deploy unav_modal.py + +# Deploy with custom parameters +UNAV_SCALEDOWN_WINDOW=600 UNAV_GPU_TYPE=t4 UNAV_RAM_MB=73728 modal deploy unav_modal.py +``` + +### GitHub Actions Deployment +1. Go to Actions -> "Deploy UNav v2 Modal" -> "Run workflow" +2. Set inputs: scaledown_window, gpu_type, ram_mb +3. Requires secrets: MODAL_TOKEN_ID, MODAL_TOKEN_SECRET + +## Code Style Guidelines + +### Import Organization +Order: stdlib -> third-party -> local imports, with blank lines between groups. + +```python +import os +import json +from typing import Dict, List, Any, Optional + +import modal +import cv2 +import numpy as np + +from .deploy_config import get_scaledown_window +from .logic import run_planner, run_localize_user +``` + +### Naming Conventions +- **Functions/variables**: snake_case (e.g., `get_destinations_list`, `image_data`) +- **Classes**: PascalCase (e.g., `UnavServer`, `FacilityNavigator`) +- **Constants**: UPPER_SNAKE_CASE (e.g., `BUILDING`, `PLACE`) +- **Logic functions**: prefix with `run_` (e.g., `run_planner`, `run_safe_serialize`) +- **Private methods**: prefix with underscore (e.g., `_configure_middleware_tracing`) + +### Type Hints +Use type hints for function parameters and return values. + +```python +def get_destinations_list_impl( + server: Any, + floor: str = "6_floor", + place: str = "New_York_City", + enable_multifloor: bool = False, +) -> Dict[str, Any]: +``` + +### Refactoring Pattern: Logic Extraction + +When extracting code from `unav_modal.py`: + +1. **Keep `@method()` decorators in `unav_modal.py`** - Modal requires them +2. **Move logic to `logic/` directory** - Each function gets `run_` prefix +3. **Thin wrapper pattern** - Method in unav_modal.py just calls the logic function + +```python +# unav_modal.py - thin wrapper +@method() +def planner(self, session_id: str, ...): + return run_planner(self, session_id=session_id, ...) + +# logic/navigation.py - actual logic +def run_planner(self, session_id: str, ...) -> Dict[str, Any]: + # Full implementation here + pass +``` + +**DO NOT create wrapper methods** for internal functions (e.g., `get_session`, `update_session`) - call `run_*` functions directly from logic modules. + +### Error Handling +- Use try/except blocks for operations that may fail +- Catch specific exceptions when possible +- Return error dictionaries for recoverable errors +- Use print statements with emojis for logging + +```python +try: + result = some_function() +except ValueError as e: + print(f"❌ Invalid value: {e}") + return {"status": "error", "message": str(e)} +except Exception as e: + print(f"❌ Unexpected error: {e}") + raise +``` + +### Code Formatting +- Maximum line length: 100 characters (soft limit) +- Use 4 spaces for indentation (no tabs) +- Use blank lines to separate logical sections +- Use trailing commas in multi-line collections +- Use f-strings for string interpolation + +### Logging Patterns +- `print("🔧 [Phase X] ...")` - Initialization steps +- `print("✅ ...")` - Success messages +- `print("⚠️ ...")` - Warnings +- `print("❌ ...")` - Errors +- `print(f"[DEBUG] ...")` - Debug info + +### Testing Guidelines +- Test files: `test_modal_functions.py` +- Use descriptive test parameters (BUILDING, PLACE, FLOOR, etc.) +- Include error handling for Modal class lookup +- Test both success and failure paths when applicable + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| UNAV_SCALEDOWN_WINDOW | 300 | Modal scaledown window (seconds) | +| UNAV_GPU_TYPE | t4 | GPU type (a10, t4, a100, any, h200) | +| UNAV_RAM_MB | 73728 | RAM reservation in MiB | +| MODAL_TOKEN_ID | - | Modal token (GitHub secret) | +| MODAL_TOKEN_SECRET | - | Modal secret (GitHub secret) | + +## Notes for Agents + +- This is a Modal-based serverless application +- Tests require a deployed Modal app to run against +- The codebase uses the unav-core library internally (runtime dependency - LSP errors are expected locally) +- Code changes may require redeployment to take effect +- Check TODO.md for technical context on implementation decisions +- Runtime imports (torch, unav, middleware, google.genai) only exist in Modal container diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..296de93 --- /dev/null +++ b/TODO.md @@ -0,0 +1,121 @@ +# UNav-VIS4ION Alignment Tasks + +## Context +Output differences between Modal.com and static GPU are from **wrapper/orchestration layer**, not from unav-core itself. + +## Root Cause Analysis + +### Why unav-server (static GPU) works better: +1. **Map loading**: Loads ALL floors at startup (`localizer.load_maps_and_features()`) +2. **Session in memory across requests persistence**: Queue persists +3. **Queue works**: Subsequent calls use refinement queue → better accuracy + +### Why Modal fails: +1. **Floor-locked maps**: Only loads target floor, not all floors +2. **No session persistence**: `enable_memory_snapshot=False`, each call hits different container +3. **Queue doesn't work**: Session/queue lost on cold-start = **ALWAYS cold start** + +This is why X-value drifts - every single localization is cold-start! + +## Codex Comparison Results +- Test case: `vianys_640_360_langone_elevator.jpeg`, NYU Langone, destination context set to `15_floor` +- Floor-lock issue: 0/5 successful localizations (floor-locked) → 5/5 successful (multifloor) +- Cold-start stabilization: XY error improved from 160.79 px → 58.90 px → 47.92 px + +--- + +## Changes Implemented (Chronological) + +### 1. Enable Multifloor by Default +- **Changed**: `enable_multifloor` default from `False` to `True` in both `planner()` and `localize_user()` +- **Why**: Matches unav-server behavior - loads all floors for the building instead of just target floor +- **Impact**: Fixes 0/5 → 5/5 success rate + +### 2. Queue Bucketing by Image Shape +- **Added** helper functions: + - `_get_queue_key_for_image_shape()` - generates queue key based on `image.shape[:2]` (e.g., "360x640") + - `_get_refinement_queue_for_map()` - retrieves queue for specific map_key and queue_key + - `_update_refinement_queue()` - updates queue for specific map_key and queue_key +- **Modified** queue structure to be nested: `{best_map_key: {queue_key: {pairs, initial_poses, pps}}}` +- **Note**: Less critical since queue doesn't persist in serverless anyway + +### 3. Cold-Start Multi-Pass Stabilization (v1 - 2 passes) +- **Initial**: Ran 2 localization passes on cold-start, averaged results +- **Why**: Since queue doesn't work in serverless, each request needs self-correction + +### 4. Cold-Start Multi-Pass Stabilization (v2 - 3 passes) +- **Changed**: Upgraded from 2 passes to 3 passes +- **Updated**: bootstrap_mode label from "mean_pass2_pass3" to "mean_all_passes" +- **Why**: Better averaging with more samples, diminishing returns after 3 but still improved + +### 5. Add Debug Fields +Added `debug_info` to responses: +- `map_scope`: "building_level_multifloor" or "floor_locked" +- `bootstrap_mode`: "mean_all_passes", "single_pass", or "none" +- `bootstrap_passes`: number of passes run +- `queue_key`: image shape bucket key +- `n_frames`: number of frames in queue +- `top_candidates_count`: number of VPR candidates + +--- + +## Technical Details + +### Helper Functions (lines 13-42) +```python +def _get_queue_key_for_image_shape(image_shape): + """Get a queue key based on image shape for bucket-based refinement queue handling.""" + if image_shape is None: + return "default" + h, w = image_shape[:2] + return f"{h}x{w}" + +def _get_refinement_queue_for_map(queue_dict, map_key, queue_key): + """Get the refinement queue for a specific map_key and queue_key.""" + ... + +def _update_refinement_queue(queue_dict, map_key, queue_key, new_queue_state): + """Update the refinement queue for a specific map_key and queue_key.""" + ... +``` + +### Cold-Start Stabilization Logic +- Triggered when `len(refinement_queue) == 0` (always in serverless) +- Runs 3 bootstrap passes on empty queue +- Averages XY coordinates and angle from all 3 passes +- Updates queue after each pass for next iteration +- Falls back to single pass if fewer than 2 succeed + +### Code Locations +- `planner()`: lines ~1470-1520 +- `localize_user()`: lines ~1902-1952 + +--- + +## Expected Improvements + +| Metric | Before | After | +|--------|--------|-------| +| Success rate (floor-locked) | 0/5 | 5/5 | +| XY error (cold-start) | ~160px | ~50px | +| Map scope | floor-locked | building-level | + +--- + +## Future Considerations + +1. **Enable memory snapshots**: Could persist queue across cold-starts (but adds ~5-10s restore time) +2. **Client-side queue**: Pass queue with each request +3. **External storage**: Redis for queue persistence +4. **top_k optimization**: Experiment with different top_k values: + - Default (None) uses config value (~10-20) + - Lower top_k = faster but fewer candidates + - Higher top_k = slower but more candidates to match against + - Consider making it dynamic based on image quality + +--- + +## Notes +- Test image: `vianys_640_360_langone_elevator.jpeg` +- Expected floor: `15_floor` +- Reference mean output should be used for comparison diff --git a/src/modal_functions/unav_v2/deploy_config.py b/src/modal_functions/unav_v2/deploy_config.py index 87c5677..224272d 100644 --- a/src/modal_functions/unav_v2/deploy_config.py +++ b/src/modal_functions/unav_v2/deploy_config.py @@ -27,6 +27,7 @@ def get_gpu_config() -> List[str]: "a10": "A10", "a10g": "A10", "a100": "A100", + "h200": "H200", "any": "any", } diff --git a/src/modal_functions/unav_v2/destinations_service.py b/src/modal_functions/unav_v2/destinations_service.py index 03105f3..0ef4af0 100644 --- a/src/modal_functions/unav_v2/destinations_service.py +++ b/src/modal_functions/unav_v2/destinations_service.py @@ -1,5 +1,8 @@ from typing import Any +from .logic.places import run_get_places +from .logic.maps import run_ensure_maps_loaded + def get_destinations_list_impl( server: Any, @@ -32,15 +35,17 @@ def _run(): print(f"🎯 [Phase 3] Getting destinations for {place}/{building}/{floor}") # Ensure maps are loaded for this location. - server.ensure_maps_loaded( - place, - building, + run_ensure_maps_loaded( + server=server, + place=place, + building=building, floor=floor, enable_multifloor=enable_multifloor, ) if enable_multifloor: - places = server.get_places( + places = run_get_places( + server, target_place=place, target_building=building, enable_multifloor=True, diff --git a/src/modal_functions/unav_v2/logic/__init__.py b/src/modal_functions/unav_v2/logic/__init__.py new file mode 100644 index 0000000..5d4301e --- /dev/null +++ b/src/modal_functions/unav_v2/logic/__init__.py @@ -0,0 +1,39 @@ +from .navigation import run_planner, run_localize_user +from .init import ( + run_init_middleware, + run_init_cpu_components, + run_init_gpu_components, + run_monkey_patch_localizer_methods, + run_monkey_patch_pose_refinement, + run_monkey_patch_feature_extractors, + run_monkey_patch_matching_and_ransac, +) +from .places import run_get_places, run_get_fallback_places +from .maps import run_ensure_maps_loaded +from .utils import ( + run_construct_mock_localization_output, + run_convert_navigation_to_trajectory, + run_set_navigation_context, + run_safe_serialize, +) +from .vlm import run_vlm_on_image + +__all__ = [ + "run_planner", + "run_localize_user", + "run_init_middleware", + "run_init_cpu_components", + "run_init_gpu_components", + "run_monkey_patch_localizer_methods", + "run_monkey_patch_pose_refinement", + "run_monkey_patch_feature_extractors", + "run_monkey_patch_matching_and_ransac", + "run_get_places", + "run_get_fallback_places", + "run_ensure_maps_loaded", + "run_construct_mock_localization_output", + "run_convert_navigation_to_trajectory", + "run_set_navigation_context", + "run_safe_serialize", + "run_vlm_on_image", +] diff --git a/src/modal_functions/unav_v2/logic/init.py b/src/modal_functions/unav_v2/logic/init.py new file mode 100644 index 0000000..f2b33bd --- /dev/null +++ b/src/modal_functions/unav_v2/logic/init.py @@ -0,0 +1,378 @@ +""" +Initialization and setup methods for UnavServer. +These are called during container startup. +""" +from modal import enter + +from .places import run_get_places + + +def run_init_middleware(self): + """Initialize Middleware.io tracking for profiling and telemetry.""" + print("🔧 [Phase 0] Initializing Middleware.io...") + print(f"🔧 [Phase 0] Current tracer state: {getattr(self, 'tracer', 'NOT_SET')}") + print(f"🔧 [Phase 0] Middleware init pending: {getattr(self, '_middleware_init_pending', 'NOT_SET')}") + + if not _gpu_available(): + print("⏸️ GPU not yet available; deferring Middleware.io initialization...") + self._middleware_init_pending = True + self.tracer = None + return + + self._middleware_init_pending = False + print("✅ GPU available; proceeding with Middleware.io initialization") + _configure_middleware_tracing(self) + + +def run_init_cpu_components(self): + """Initialize CPU-only components that can be safely snapshotted.""" + print("🚀 [Phase 1] Initializing CPU components for snapshotting...") + + from unav.config import UNavConfig + from unav.navigator.multifloor import FacilityNavigator + from unav.navigator.commander import commands_from_result + + self.DATA_ROOT = "/root/UNav-IO/data" + self.FEATURE_MODEL = "DinoV2Salad" + self.LOCAL_FEATURE_MODEL = "superpoint+lightglue" + self.PLACES = run_get_places(self) + + print("🔧 Initializing UNavConfig...") + self.config = UNavConfig( + data_final_root=self.DATA_ROOT, + places=self.PLACES, + global_descriptor_model=self.FEATURE_MODEL, + local_feature_model=self.LOCAL_FEATURE_MODEL, + ) + print("✅ UNavConfig initialized successfully") + + self.localizor_config = self.config.localizer_config + self.navigator_config = self.config.navigator_config + print("✅ Config objects extracted successfully") + + print("🧭 Initializing FacilityNavigator (CPU-only)...") + self.nav = FacilityNavigator(self.navigator_config) + print("✅ FacilityNavigator initialized successfully") + + self.commander = commands_from_result + self.maps_loaded = set() + self.selective_localizers = {} + self.cpu_components_initialized = True + print("📸 CPU components ready for snapshotting!") + + +def run_init_gpu_components(self): + """Initialize GPU-dependent components that cannot be snapshotted.""" + print("🚀 [Phase 2] Initializing GPU components after snapshot restoration...") + + try: + import torch + cuda_available = torch.cuda.is_available() + print(f"[GPU DEBUG] torch.cuda.is_available(): {cuda_available}") + + if not cuda_available: + print("[GPU ERROR] CUDA not available! Raising exception...") + raise RuntimeError("GPU not available when required.") + + print(f"[GPU DEBUG] torch.cuda.device_count(): {torch.cuda.device_count()}") + print(f"[GPU DEBUG] torch.cuda.current_device(): {torch.cuda.current_device()}") + print(f"[GPU DEBUG] torch.cuda.get_device_name(0): {torch.cuda.get_device_name(0)}") + except Exception as gpu_debug_exc: + print(f"[GPU DEBUG] Error printing GPU info: {gpu_debug_exc}") + if "GPU not available when required" in str(gpu_debug_exc): + raise + + if not hasattr(self, "cpu_components_initialized"): + print("⚠️ CPU components not initialized, initializing now...") + run_init_cpu_components(self) + + from unav.localizer.localizer import UNavLocalizer + + print("🤖 Initializing UNavLocalizer (GPU-dependent)...") + self.localizer = UNavLocalizer(self.localizor_config) + + try: + self._monkey_patch_localizer_methods(self.localizer) + self._monkey_patch_pose_refinement() + self._monkey_patch_feature_extractors() + except Exception as e: + print(f"⚠️ Failed to monkey-patch UNavLocalizer methods: {e}") + + print("✅ UNavLocalizer initialized (maps will load on demand)") + + self.gpu_components_initialized = True + print("🎉 Full UNav system initialization complete! Ready for fast inference.") + print(f"🎉 [Phase 2] Checking for deferred middleware init: _middleware_init_pending={getattr(self, '_middleware_init_pending', 'NOT_SET')}") + + if getattr(self, "_middleware_init_pending", False): + print("🔁 GPU acquired; completing deferred Middleware.io initialization...") + self._middleware_init_pending = False + _configure_middleware_tracing(self) + try: + self._monkey_patch_localizer_methods(self.localizer) + self._monkey_patch_pose_refinement() + self._monkey_patch_matching_and_ransac() + self._monkey_patch_feature_extractors() + except Exception as e: + print(f"⚠️ Failed to re-patch after deferred init: {e}") + else: + print("✅ [Phase 2] No deferred middleware initialization needed") + + +def _gpu_available() -> bool: + """Utility to detect whether CUDA GPUs are currently accessible.""" + try: + import torch + available = torch.cuda.is_available() + print(f"[GPU CHECK] torch.cuda.is_available(): {available}") + return available + except Exception as exc: + print(f"[GPU CHECK] Unable to determine GPU availability: {exc}") + return False + + +def _configure_middleware_tracing(self): + """Configure Middleware.io tracing.""" + print("🔧 [CONFIGURE] Starting Middleware.io configuration...") + from middleware import mw_tracker, MWOptions + from opentelemetry import trace + import os + + api_key = os.environ.get("MW_API_KEY") + target = os.environ.get("MW_TARGET") + + if not api_key or not target: + print("⚠️ Warning: MW_API_KEY and MW_TARGET not set. Skipping middleware initialization.") + self.tracer = None + return + + try: + mw_tracker( + MWOptions( + access_token=api_key, + target=target, + service_name="UNav-Server", + console_exporter=False, + log_level="INFO", + collect_profiling=True, + collect_traces=True, + collect_metrics=True, + ) + ) + + self.tracer = trace.get_tracer(__name__) + print("✅ Middleware.io initialized successfully") + print(f"✅ [CONFIGURE] Tracer created: {type(self.tracer).__name__}") + except Exception as e: + print(f"⚠️ Warning: Failed to initialize Middleware.io: {e}") + self.tracer = None + print(f"⚠️ [CONFIGURE] Tracer set to None due to error") + + +def run_monkey_patch_localizer_methods(self, localizer, method_names=None): + """Add spans to internal UNavLocalizer methods by monkey-patching.""" + import os + import functools + import inspect + + if not hasattr(self, "tracer") or not self.tracer: + return + + tracer = self.tracer + default_candidates = [ + "extract_query_features", + "vpr_retrieve", + "get_candidates_data", + "batch_local_matching_and_ransac", + "multi_frame_pose_refine", + "transform_pose_to_floorplan", + ] + internal_components = ["global_extractor", "local_extractor", "local_matcher"] + + override = os.getenv("MW_UNAV_TRACE_METHODS") + if override: + method_names = [m.strip() for m in override.split(",") if m.strip()] + else: + method_names = method_names or (default_candidates + internal_components) + + def _wrap(orig, name): + if getattr(orig, "__mw_wrapped__", False): + return orig + + if inspect.iscoroutinefunction(orig): + async def _async_wrapper(*args, **kwargs): + with tracer.start_as_current_span(f"unav.{name}") as span: + try: + return await orig(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + raise + _async_wrapper.__mw_wrapped__ = True + return functools.wraps(orig)(_async_wrapper) + else: + def _sync_wrapper(*args, **kwargs): + with tracer.start_as_current_span(f"unav.{name}") as span: + try: + return orig(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + raise + _sync_wrapper.__mw_wrapped__ = True + return functools.wraps(orig)(_sync_wrapper) + + patched = [] + for mname in method_names: + if hasattr(localizer, mname): + try: + orig = getattr(localizer, mname) + wrapped = _wrap(orig, mname) + setattr(localizer, mname, wrapped) + patched.append(mname) + except Exception as e: + print(f"⚠️ Failed to patch {mname}: {e}") + continue + + if patched: + print(f"🔧 Patched localizer methods for tracing: {patched}") + + +def run_monkey_patch_pose_refinement(self): + """Patch pose refinement libraries.""" + if not hasattr(self, "tracer") or not self.tracer: + return + + tracer = self.tracer + import functools + + try: + import poselib + if not getattr(poselib, "__mw_patched__", False): + original_estimate = poselib.estimate_1D_radial_absolute_pose + + @functools.wraps(original_estimate) + def traced_estimate(*args, **kwargs): + with tracer.start_as_current_span("unav.poselib.estimate_1D_radial"): + return original_estimate(*args, **kwargs) + + poselib.estimate_1D_radial_absolute_pose = traced_estimate + poselib.__mw_patched__ = True + print("🔧 Patched poselib.estimate_1D_radial_absolute_pose") + except Exception as e: + print(f"⚠️ Failed to patch poselib: {e}") + + try: + import pyimplicitdist + if not getattr(pyimplicitdist, "__mw_patched__", False): + original_refine_1d = pyimplicitdist.pose_refinement_1D_radial + + @functools.wraps(original_refine_1d) + def traced_refine_1d(*args, **kwargs): + with tracer.start_as_current_span("unav.pyimplicitdist.pose_refinement_1D_radial"): + return original_refine_1d(*args, **kwargs) + + pyimplicitdist.pose_refinement_1D_radial = traced_refine_1d + + original_build_cm = pyimplicitdist.build_cost_matrix_multi + + @functools.wraps(original_build_cm) + def traced_build_cm(*args, **kwargs): + with tracer.start_as_current_span("unav.pyimplicitdist.build_cost_matrix_multi"): + return original_build_cm(*args, **kwargs) + + pyimplicitdist.build_cost_matrix_multi = traced_build_cm + + original_refine_multi = pyimplicitdist.pose_refinement_multi + + @functools.wraps(original_refine_multi) + def traced_refine_multi(*args, **kwargs): + with tracer.start_as_current_span("unav.pyimplicitdist.pose_refinement_multi"): + return original_refine_multi(*args, **kwargs) + + pyimplicitdist.pose_refinement_multi = traced_refine_multi + pyimplicitdist.__mw_patched__ = True + print("🔧 Patched pyimplicitdist functions") + except Exception as e: + print(f"⚠️ Failed to patch pyimplicitdist: {e}") + + +def run_monkey_patch_feature_extractors(self): + """Patch feature extraction pipeline.""" + if not hasattr(self, "tracer") or not self.tracer: + return + + tracer = self.tracer + import functools + + try: + from unav.localizer.tools import feature_extractor + + if not getattr(feature_extractor, "__mw_patched__", False): + original_extract = feature_extractor.extract_query_features + + @functools.wraps(original_extract) + def traced_extract_query_features(query_img, global_extractor, local_extractor, global_model_name, device): + with tracer.start_as_current_span("unav.extract_query_features"): + return original_extract(query_img, global_extractor, local_extractor, global_model_name, device) + + feature_extractor.extract_query_features = traced_extract_query_features + feature_extractor.__mw_patched__ = True + print("🔧 Patched extract_query_features with tracing wrapper") + except Exception as e: + print(f"⚠️ Failed to patch extract_query_features: {e}") + + try: + from unav.core.feature.Global_Extractors import GlobalExtractors + + if not getattr(GlobalExtractors, "__mw_patched__", False): + original_call = GlobalExtractors.__call__ + + @functools.wraps(original_call) + def traced_global_call(self, request_model, images): + with tracer.start_as_current_span(f"unav.global_extractor.{request_model}.model_forward"): + return original_call(self, request_model, images) + + GlobalExtractors.__call__ = traced_global_call + GlobalExtractors.__mw_patched__ = True + print("🔧 Patched GlobalExtractors.__call__ for model inference tracing") + except Exception as e: + print(f"⚠️ Failed to patch GlobalExtractors: {e}") + + +def run_monkey_patch_matching_and_ransac(self): + """Patch matching and RANSAC functions.""" + if not hasattr(self, "tracer") or not self.tracer: + return + + tracer = self.tracer + import functools + + try: + from unav.core import feature_filter + + if not getattr(feature_filter, "__mw_patched__", False): + if hasattr(feature_filter, "match_query_to_database"): + original_match = feature_filter.match_query_to_database + + @functools.wraps(original_match) + def traced_match(*args, **kwargs): + with tracer.start_as_current_span("unav.match_query_to_database"): + return original_match(*args, **kwargs) + + feature_filter.match_query_to_database = traced_match + print("🔧 Patched match_query_to_database") + + if hasattr(feature_filter, "ransac_filter"): + original_ransac = feature_filter.ransac_filter + + @functools.wraps(original_ransac) + def traced_ransac(*args, **kwargs): + with tracer.start_as_current_span("unav.ransac_filter"): + return original_ransac(*args, **kwargs) + + feature_filter.ransac_filter = traced_ransac + print("🔧 Patched ransac_filter") + + feature_filter.__mw_patched__ = True + print("🔧 Patched feature_filter module successfully") + except Exception as e: + print(f"⚠️ Failed to patch feature_filter: {e}") diff --git a/src/modal_functions/unav_v2/logic/maps.py b/src/modal_functions/unav_v2/logic/maps.py new file mode 100644 index 0000000..79dc3a5 --- /dev/null +++ b/src/modal_functions/unav_v2/logic/maps.py @@ -0,0 +1,91 @@ +from typing import Any, Dict, Optional, Set + +from .places import run_get_places + + +def run_ensure_maps_loaded( + server: Any, + place: str, + building: Optional[str] = None, + floor: Optional[str] = None, + enable_multifloor: bool = False, +): + """ + Ensure that maps for a specific place/building are loaded. + When building is specified, loads all floors for that building. + Creates selective localizer instances for true lazy loading. + """ + if building: + if enable_multifloor or not floor: + map_key = (place, building) + else: + map_key = (place, building, floor) + else: + map_key = place + + if map_key in server.maps_loaded: + return + + print(f"🔄 [Phase 4] Creating selective localizer for: {map_key}") + + if building: + selective_places = run_get_places( + server, + target_place=place, + target_building=building, + target_floor=floor, + enable_multifloor=enable_multifloor, + ) + else: + selective_places = run_get_places(server, target_place=place) + + if not selective_places: + print( + "⚠️ No matching places found for selective load; skipping localizer creation" + ) + return + + from unav.config import UNavConfig + + selective_config = UNavConfig( + data_final_root=server.DATA_ROOT, + places=selective_places, + global_descriptor_model=server.FEATURE_MODEL, + local_feature_model=server.LOCAL_FEATURE_MODEL, + ) + + from unav.localizer.localizer import UNavLocalizer + import time + + selective_localizer = UNavLocalizer(selective_config.localizer_config) + + if hasattr(server, "tracer") and server.tracer: + try: + server._monkey_patch_localizer_methods(selective_localizer) + except Exception as e: + print(f"⚠️ Failed to patch selective localizer: {e}") + + if hasattr(server, "tracer") and server.tracer: + with server.tracer.start_as_current_span( + "load_maps_and_features_span" + ) as load_span: + load_span.add_event("Starting map and feature loading") + load_span.set_attribute("map_key", str(map_key)) + load_span.set_attribute("selective_places", str(selective_places)) + + start_load_time = time.time() + selective_localizer.load_maps_and_features() + load_duration = time.time() - start_load_time + + load_span.set_attribute("load_duration_seconds", load_duration) + load_span.add_event("Map and feature loading completed") + else: + print(f"⏱️ Starting load_maps_and_features for: {map_key}") + start_load_time = time.time() + selective_localizer.load_maps_and_features() + load_duration = time.time() - start_load_time + print(f"⏱️ Completed load_maps_and_features in {load_duration:.2f} seconds") + + server.selective_localizers[map_key] = selective_localizer + server.maps_loaded.add(map_key) + print(f"✅ Selective localizer created and maps loaded for: {map_key}") diff --git a/src/modal_functions/unav_v2/logic/navigation.py b/src/modal_functions/unav_v2/logic/navigation.py new file mode 100644 index 0000000..143cc4c --- /dev/null +++ b/src/modal_functions/unav_v2/logic/navigation.py @@ -0,0 +1,477 @@ +""" +Navigation logic functions - called by endpoints in unav_modal.py +""" +import numpy as np +from typing import Dict, Any, Optional + +from .utils import ( + run_safe_serialize, + run_convert_navigation_to_trajectory, + run_construct_mock_localization_output, + run_set_navigation_context, +) +from .maps import run_ensure_maps_loaded +from .vlm import run_vlm_on_image + + +def run_planner( + self, + session_id: str, + base_64_image, + destination_id: str, + place: str, + building: str, + floor: str, + top_k: int = None, + unit: str = "meter", + language: str = "en", + refinement_queue: dict = None, + is_vlm_extraction_enabled: bool = False, + enable_multifloor: bool = True, + should_use_user_provided_coordinate: bool = False, + x: float = None, + y: float = None, + angle: float = None, + turn_mode: str = "default", +) -> Dict[str, Any]: + """Full localization and navigation pipeline logic.""" + import time + import logging + import uuid + import base64 + import cv2 + + from ..server_methods.helpers import ( + _get_queue_key_for_image_shape, + _update_refinement_queue, + ) + + call_id = str(uuid.uuid4()) + has_tracer = hasattr(self, "tracer") and self.tracer is not None + print(f"📋 [PLANNER] Called with session_id={session_id}, call_id={call_id}, has_tracer={has_tracer}") + print(f"📋 [PLANNER] Params: destination_id={destination_id}, place={place}, building={building}, floor={floor}, top_k={top_k}, unit={unit}, language={language}, enable_multifloor={enable_multifloor}, should_use_user_provided_coordinate={should_use_user_provided_coordinate}") + + if has_tracer: + print(f"📋 [PLANNER] Using TRACED execution path for call_id={call_id}") + with self.tracer.start_as_current_span("planner_span") as parent_span: + parent_span.set_attribute("unav.call_id", call_id) + parent_span.set_attribute("unav.session_id", session_id) + start_time = time.time() + timing_data = {} + image = None + + if should_use_user_provided_coordinate: + if x is None or y is None or angle is None: + return {"status": "error", "error": "x, y, angle required", "timing": {"total": (time.time() - start_time) * 1000}} + print(f"📍 Using user-provided coordinates: x={x}, y={y}, angle={angle}°") + if base_64_image is not None: + print("⚠️ Image provided but will be ignored since using provided coordinates") + elif base_64_image is None: + return {"status": "error", "error": "No image provided", "timing": {"total": (time.time() - start_time) * 1000}} + + if not should_use_user_provided_coordinate: + if isinstance(base_64_image, str): + try: + base64_string = base_64_image + if "," in base64_string: + base64_string = base64_string.split(",")[1] + missing_padding = len(base64_string) % 4 + if missing_padding: + base64_string += "=" * (4 - missing_padding) + image_bytes = base64.b64decode(base64_string) + print(f"Received base64 image string of length {len(base64_string)}") + image_array = np.frombuffer(image_bytes, dtype=np.uint8) + image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + if image is None: + return {"status": "error", "error": "Failed to decode image", "timing": {"total": (time.time() - start_time) * 1000}} + except Exception as img_error: + return {"status": "error", "error": f"Error: {str(img_error)}", "timing": {"total": (time.time() - start_time) * 1000}} + elif isinstance(base_64_image, np.ndarray): + image = base_64_image + else: + return {"status": "error", "error": f"Unsupported: {type(base_64_image)}", "timing": {"total": (time.time() - start_time) * 1000}} + + try: + import torch + cuda_available = torch.cuda.is_available() + print(f"[GPU DEBUG] torch.cuda.is_available(): {cuda_available}") + if cuda_available: + print(f"[GPU DEBUG] torch.cuda.device_count(): {torch.cuda.device_count()}") + print(f"[GPU DEBUG] torch.cuda.current_device(): {torch.cuda.current_device()}") + print(f"[GPU DEBUG] torch.cuda.get_device_name(0): {torch.cuda.get_device_name(0)}") + + setup_start = time.time() + dest_id = destination_id + target_place = place + target_building = building + target_floor = floor + user_id = session_id + session = self.get_session(user_id) + + if not dest_id: + dest_id = session.get("selected_dest_id") + if not target_place: + target_place = session.get("target_place") + if not target_building: + target_building = session.get("target_building") + if not target_floor: + target_floor = session.get("target_floor") + if unit == "feet": + unit = session.get("unit", "feet") + if language == "en": + language = session.get("language", "en") + + if not all([dest_id, target_place, target_building, target_floor]): + return {"status": "error", "error": "Incomplete navigation context"} + + self.update_session(user_id, {"selected_dest_id": dest_id, "target_place": target_place, "target_building": target_building, "target_floor": target_floor, "unit": unit, "language": language}) + + if refinement_queue is None: + session_refinement_queue = session.get("refinement_queue") or {} + if image is not None and hasattr(image, 'shape'): + queue_key = image.shape[:2] + if queue_key in session_refinement_queue: + refinement_queue = session_refinement_queue[queue_key] + else: + refinement_queue = {} + else: + refinement_queue = {} + + timing_data["setup"] = (time.time() - setup_start) * 1000 + print(f"⏱️ Setup: {timing_data['setup']:.2f}ms") + + if should_use_user_provided_coordinate: + print("⏭️ Skipping localization - using provided coordinates") + localization_start = time.time() + output = run_construct_mock_localization_output(x=x, y=y, angle=angle, place=target_place, building=target_building, floor=target_floor) + timing_data["localization"] = (time.time() - localization_start) * 1000 + print(f"⏱️ Mock Localization: {timing_data['localization']:.2f}ms") + else: + localization_start = time.time() + with self.tracer.start_as_current_span("localization_span"): + self.ensure_gpu_components_ready() + with self.tracer.start_as_current_span("load_maps_span"): + run_ensure_maps_loaded( + server=self, + place=target_place, + building=target_building, + floor=target_floor, + enable_multifloor=enable_multifloor, + ) + + map_key = (target_place, target_building) + localizer_to_use = self.selective_localizers.get(map_key) + if not localizer_to_use and target_floor: + floor_key = (target_place, target_building, target_floor) + localizer_to_use = self.selective_localizers.get(floor_key, self.localizer) + else: + localizer_to_use = localizer_to_use or self.localizer + + queue_key = _get_queue_key_for_image_shape(image.shape) + is_cold_start = len(refinement_queue) == 0 + print(f"🔍 Cold start: {is_cold_start}, refinement_queue size: {len(refinement_queue)}") + + if is_cold_start: + bootstrap_outputs = [] + empty_queue = refinement_queue.copy() + for bootstrap_pass in range(2): + print(f"🔄 Bootstrap pass {bootstrap_pass + 1}/2...") + bootstrap_output = localizer_to_use.localize(image, empty_queue, top_k=top_k) + if bootstrap_output and bootstrap_output.get("success"): + bootstrap_outputs.append(bootstrap_output) + best_map_key = bootstrap_output.get("best_map_key") + print(f" ✅ Pass {bootstrap_pass + 1}: best_map_key={best_map_key}") + new_queue = bootstrap_output.get("refinement_queue", {}) + if best_map_key and new_queue: + empty_queue = _update_refinement_queue(empty_queue, best_map_key, queue_key, new_queue.get(best_map_key, {}).get(queue_key, {"pairs": [], "initial_poses": [], "pps": []})) + + if len(bootstrap_outputs) >= 2: + xy_sum = [0.0, 0.0] + ang_sum = 0.0 + for bo in bootstrap_outputs: + fp = bo.get("floorplan_pose", {}) + xy = fp.get("xy", [0, 0]) + xy_sum[0] += xy[0] + xy_sum[1] += xy[1] + ang_sum += fp.get("ang", 0) + avg_xy = [xy_sum[0] / len(bootstrap_outputs), xy_sum[1] / len(bootstrap_outputs)] + avg_ang = ang_sum / len(bootstrap_outputs) + output = bootstrap_outputs[-1].copy() + output["floorplan_pose"] = {"xy": avg_xy, "ang": avg_ang} + output["bootstrap_mode"] = "mean_all_passes" + output["bootstrap_passes"] = len(bootstrap_outputs) + elif bootstrap_outputs: + output = bootstrap_outputs[-1] + output["bootstrap_mode"] = "single_pass" + else: + output = localizer_to_use.localize(image, refinement_queue, top_k=top_k) + output["bootstrap_mode"] = "none" + else: + output = localizer_to_use.localize(image, refinement_queue, top_k=top_k) + output["bootstrap_mode"] = "none" + + output["map_scope"] = "building_level_multifloor" if enable_multifloor else "floor_locked" + output["queue_key"] = queue_key + + timing_data["localization"] = (time.time() - localization_start) * 1000 + print(f"⏱️ Localization: {timing_data['localization']:.2f}ms") + print(f"📍 Localization result: floorplan_pose={output.get('floorplan_pose')}, map_key={output.get('best_map_key')}, map_scope={output.get('map_scope')}") + + if output is None or "floorplan_pose" not in output: + print("❌ Localization failed, no pose found.") + if is_vlm_extraction_enabled: + try: + extracted_text = run_vlm_on_image(server=self, image=image) + return {"status": "error", "error": "Localization failed", "extracted_text": extracted_text, "timing": timing_data} + except Exception as vlm_error: + print(f"❌ Error during VLM fallback: {vlm_error}") + return {"status": "error", "error": "VLM failed", "vlm_error": str(vlm_error), "timing": timing_data} + return {"status": "error", "error": "Localization failed", "timing": timing_data} + + processing_start = time.time() + floorplan_pose = output["floorplan_pose"] + start_xy, start_heading = floorplan_pose["xy"], -floorplan_pose["ang"] + source_key = output["best_map_key"] + start_place, start_building, start_floor = source_key + + if image is not None and hasattr(image, 'shape'): + queue_key = image.shape[:2] + current_session_queue = session.get("refinement_queue") or {} + current_session_queue[queue_key] = output.get("refinement_queue", {}) + else: + current_session_queue = {} + self.update_session(user_id, {"current_place": start_place, "current_building": start_building, "current_floor": start_floor, "floorplan_pose": floorplan_pose, "refinement_queue": current_session_queue}) + + try: + dest_id_for_path = int(dest_id) + except (ValueError, TypeError): + dest_id_for_path = dest_id + + timing_data["processing"] = (time.time() - processing_start) * 1000 + print(f"⏱️ Processing: {timing_data['processing']:.2f}ms") + + path_planning_start = time.time() + with self.tracer.start_as_current_span("path_planning_span"): + result = self.nav.find_path(start_place, start_building, start_floor, start_xy, target_place, target_building, target_floor, dest_id_for_path) + + timing_data["path_planning"] = (time.time() - path_planning_start) * 1000 + print(f"⏱️ Path Planning: {timing_data['path_planning']:.2f}ms") + + if result is None or (isinstance(result, dict) and "error" in result): + return {"status": "error", "error": "Path planning failed", "timing": timing_data} + + command_generation_start = time.time() + with self.tracer.start_as_current_span("command_generation_span"): + cmds = self.commander( + self.nav, + result, + initial_heading=start_heading, + unit=unit, + language=language, + turn_mode=turn_mode, + ) + + timing_data["command_generation"] = (time.time() - command_generation_start) * 1000 + + serialization_start = time.time() + serialized_result = run_safe_serialize(result) + serialized_cmds = run_safe_serialize(cmds) + serialized_source_key = run_safe_serialize(source_key) + serialized_floorplan_pose = run_safe_serialize(floorplan_pose) + timing_data["serialization"] = (time.time() - serialization_start) * 1000 + print(f"⏱️ Serialization: {timing_data['serialization']:.2f}ms") + + timing_data["total"] = (time.time() - start_time) * 1000 + print(f"⏱️ Total Navigation Time: {timing_data['total']:.2f}ms") + + result = { + "status": "success", + "result": serialized_result, + "cmds": serialized_cmds, + "best_map_key": serialized_source_key, + "floorplan_pose": serialized_floorplan_pose, + "navigation_info": {"start_location": f"{start_place}/{start_building}/{start_floor}", "destination": f"{target_place}/{target_building}/{target_floor}", "dest_id": dest_id, "unit": unit, "language": language}, + "timing": timing_data, + "debug_info": {"map_scope": output.get("map_scope", "unknown"), "bootstrap_mode": output.get("bootstrap_mode", "none"), "bootstrap_passes": output.get("bootstrap_passes"), "queue_key": output.get("queue_key", "unknown"), "n_frames": output.get("n_frames"), "top_candidates_count": len(output.get("top_candidates", []))}, + } + + return run_convert_navigation_to_trajectory(result) + + except Exception as e: + timing_data["total"] = (time.time() - start_time) * 1000 + print(f"❌ Error in planner: {str(e)}") + import traceback + traceback.print_exc() + return {"status": "error", "error": str(e), "type": type(e).__name__, "timing": timing_data} + else: + print("📋 [PLANNER] Using NON-TRACED execution path") + pass + + +def run_localize_user( + self, + session_id: str, + base_64_image, + place: str, + building: str, + floor: str, + top_k: int = None, + refinement_queue: dict = None, + enable_multifloor: bool = True, +) -> Dict[str, Any]: + """Localize user position without navigation planning.""" + import time + import cv2 + import base64 + + from ..server_methods.helpers import _get_queue_key_for_image_shape + + print(f"📋 [LOCALIZE_USER] Called with session_id={session_id}") + print(f"📋 [LOCALIZE_USER] Params: place={place}, building={building}, floor={floor}, top_k={top_k}, enable_multifloor={enable_multifloor}") + start_time = time.time() + + if base_64_image is None: + return {"status": "error", "error": "No image provided", "timing": {"total": (time.time() - start_time) * 1000}} + + if isinstance(base_64_image, str): + try: + base64_string = base_64_image + if "," in base64_string: + base64_string = base64_string.split(",")[1] + missing_padding = len(base64_string) % 4 + if missing_padding: + base64_string += "=" * (4 - missing_padding) + image_bytes = base64.b64decode(base64_string) + print(f"Received base64 image string of length {len(base64_string)}") + image_array = np.frombuffer(image_bytes, dtype=np.uint8) + image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + if image is None: + return {"status": "error", "error": "Failed to decode image", "timing": {"total": (time.time() - start_time) * 1000}} + except Exception as img_error: + return {"status": "error", "error": f"Error: {str(img_error)}", "timing": {"total": (time.time() - start_time) * 1000}} + elif isinstance(base_64_image, np.ndarray): + image = base_64_image + else: + return {"status": "error", "error": f"Unsupported: {type(base_64_image)}", "timing": {"total": (time.time() - start_time) * 1000}} + + try: + has_tracer = hasattr(self, "tracer") and self.tracer is not None + + if has_tracer: + with self.tracer.start_as_current_span("localize_user_span") as span: + span.set_attribute("unav.session_id", session_id) + self.ensure_gpu_components_ready() + run_ensure_maps_loaded( + server=self, + place=place, + building=building, + floor=floor, + enable_multifloor=enable_multifloor, + ) + + map_key = (place, building) + localizer = self.selective_localizers.get(map_key) + if not localizer and floor: + localizer = self.selective_localizers.get((place, building, floor), self.localizer) + else: + localizer = localizer or self.localizer + + queue_key = _get_queue_key_for_image_shape(image.shape) + + if refinement_queue is None: + session = self.get_session(session_id) + session_refinement_queue = session.get("refinement_queue") or {} + queue_key_tuple = image.shape[:2] + if queue_key_tuple in session_refinement_queue: + refinement_queue = session_refinement_queue[queue_key_tuple] + else: + refinement_queue = {} + + is_cold_start = len(refinement_queue) == 0 + print(f"🔍 Cold start: {is_cold_start}, refinement_queue size: {len(refinement_queue)}") + + if is_cold_start: + bootstrap_outputs = [] + empty_queue = refinement_queue.copy() + for bootstrap_pass in range(2): + print(f"🔄 Bootstrap pass {bootstrap_pass + 1}/2...") + bootstrap_output = localizer.localize(image, empty_queue, top_k=top_k) + if bootstrap_output and bootstrap_output.get("success"): + bootstrap_outputs.append(bootstrap_output) + best_map_key = bootstrap_output.get("best_map_key") + print(f" ✅ Pass {bootstrap_pass + 1}: best_map_key={best_map_key}") + + if len(bootstrap_outputs) >= 2: + xy_sum = [0.0, 0.0] + ang_sum = 0.0 + for bo in bootstrap_outputs: + fp = bo.get("floorplan_pose", {}) + xy = fp.get("xy", [0, 0]) + xy_sum[0] += xy[0] + xy_sum[1] += xy[1] + ang_sum += fp.get("ang", 0) + avg_xy = [xy_sum[0] / len(bootstrap_outputs), xy_sum[1] / len(bootstrap_outputs)] + avg_ang = ang_sum / len(bootstrap_outputs) + output = bootstrap_outputs[-1].copy() + output["floorplan_pose"] = {"xy": avg_xy, "ang": avg_ang} + output["bootstrap_mode"] = "mean_all_passes" + elif bootstrap_outputs: + output = bootstrap_outputs[-1] + output["bootstrap_mode"] = "single_pass" + else: + output = localizer.localize(image, refinement_queue, top_k=top_k) + output["bootstrap_mode"] = "none" + else: + output = localizer.localize(image, refinement_queue, top_k=top_k) + output["bootstrap_mode"] = "none" + + output["map_scope"] = "building_level_multifloor" if enable_multifloor else "floor_locked" + output["queue_key"] = queue_key + else: + self.ensure_gpu_components_ready() + run_ensure_maps_loaded( + server=self, + place=place, + building=building, + floor=floor, + enable_multifloor=enable_multifloor, + ) + + map_key = (place, building) + localizer = self.selective_localizers.get(map_key) + if not localizer and floor: + localizer = self.selective_localizers.get((place, building, floor), self.localizer) + else: + localizer = localizer or self.localizer + + output = localizer.localize(image, refinement_queue or {}, top_k=top_k) + output["bootstrap_mode"] = "none" + output["map_scope"] = "building_level_multifloor" if enable_multifloor else "floor_locked" + + if output is None or "floorplan_pose" not in output: + return {"status": "error", "error": "Localization failed", "timing": {"total": (time.time() - start_time) * 1000}} + + floorplan_pose = output["floorplan_pose"] + best_map_key = output["best_map_key"] + + queue_key_tuple = image.shape[:2] + current_session_queue = session.get("refinement_queue") or {} + current_session_queue[queue_key_tuple] = output.get("refinement_queue", {}) + self.update_session(session_id, {"current_place": best_map_key[0], "current_building": best_map_key[1], "current_floor": best_map_key[2], "floorplan_pose": floorplan_pose, "refinement_queue": current_session_queue}) + + timing_data = {"total": (time.time() - start_time) * 1000} + print(f"⏱️ Localization total: {timing_data['total']:.2f}ms") + print(f"📍 Result: floorplan_pose={floorplan_pose}, best_map_key={best_map_key}") + + return { + "status": "success", + "floorplan_pose": run_safe_serialize(floorplan_pose), + "best_map_key": run_safe_serialize(best_map_key), + "timing": timing_data, + "debug_info": {"map_scope": output.get("map_scope"), "bootstrap_mode": output.get("bootstrap_mode"), "queue_key": output.get("queue_key")}, + } + + except Exception as e: + import traceback + traceback.print_exc() + return {"status": "error", "error": str(e), "type": type(e).__name__, "timing": {"total": (time.time() - start_time) * 1000}} diff --git a/src/modal_functions/unav_v2/logic/places.py b/src/modal_functions/unav_v2/logic/places.py new file mode 100644 index 0000000..cfc6194 --- /dev/null +++ b/src/modal_functions/unav_v2/logic/places.py @@ -0,0 +1,117 @@ +""" +Places-related logic functions. +""" +import os +from typing import Dict, List, Optional + + +def run_get_places( + self, + target_place: Optional[str] = None, + target_building: Optional[str] = None, + target_floor: Optional[str] = None, + enable_multifloor: bool = False, +) -> Dict[str, Dict[str, List[str]]]: + """Get available places configuration, optionally filtering to a specific floor""" + try: + print("📁 Fetching places from data directory...") + + SKIP_FOLDERS = { + "features", + "colmap_map", + ".ipynb_checkpoints", + "parameters", + } + + def should_skip_folder(folder_name): + return ( + folder_name in SKIP_FOLDERS + or "_old" in folder_name.lower() + or folder_name.endswith("_old") + ) + + places: Dict[str, Dict[str, List[str]]] = {} + + data_root = getattr(self, "DATA_ROOT", "/root/UNav-IO/data") + + if os.path.exists(data_root): + for place_name in os.listdir(data_root): + place_path = os.path.join(data_root, place_name) + if os.path.isdir(place_path) and not should_skip_folder(place_name): + if target_place and place_name != target_place: + continue + places[place_name] = {} + print(f" ✓ Found place: {place_name}") + + for building_name in os.listdir(place_path): + building_path = os.path.join(place_path, building_name) + if os.path.isdir(building_path) and not should_skip_folder(building_name): + if target_building and building_name != target_building: + continue + floors = [] + + for floor_name in os.listdir(building_path): + floor_path = os.path.join(building_path, floor_name) + if os.path.isdir(floor_path) and not should_skip_folder(floor_name): + if ( + place_name == "New_York_City" + and building_name == "LOH" + and floor_name == "9_floor" + ): + print(f" ⚠️ Skipping {building_name}/{floor_name}: explicitly excluded") + continue + + if ( + not enable_multifloor + and target_floor + and floor_name != target_floor + ): + continue + + boundaries_file = os.path.join(floor_path, "boundaries.json") + if not os.path.exists(boundaries_file): + print(f" ⚠️ Skipping {building_name}/{floor_name}: missing boundaries.json") + continue + + floors.append(floor_name) + + if floors: + places[place_name][building_name] = floors + print(f" ✓ Building: {building_name} with floors: {floors}") + + places = {k: v for k, v in places.items() if v} + + if not enable_multifloor and target_floor: + for p_name in list(places.keys()): + buildings = places[p_name] + for b_name in list(buildings.keys()): + filtered_floors = [ + f_name + for f_name in buildings[b_name] + if f_name == target_floor + ] + if filtered_floors: + buildings[b_name] = filtered_floors + else: + del buildings[b_name] + if not buildings: + del places[p_name] + + print(f"✅ Found {len(places)} places with buildings and floors") + return places + else: + print(f"⚠️ Data root {data_root} does not exist, using fallback") + return run_get_fallback_places() + + except Exception as e: + print(f"❌ Error fetching places: {e}, using fallback") + return run_get_fallback_places() + + +def run_get_fallback_places(): + """Fallback hardcoded places configuration""" + return { + "New_York_City": {"LightHouse": ["3_floor", "4_floor", "6_floor"]}, + "New_York_University": {"Langone": ["15_floor", "16_floor", "17_floor"]}, + "Mahidol_University": {"Jubilee": ["fl1", "fl2", "fl3"]}, + } diff --git a/src/modal_functions/unav_v2/logic/utils.py b/src/modal_functions/unav_v2/logic/utils.py new file mode 100644 index 0000000..af784d4 --- /dev/null +++ b/src/modal_functions/unav_v2/logic/utils.py @@ -0,0 +1,134 @@ +from typing import Any, Dict + + +def run_safe_serialize(obj: Any) -> Any: + """Helper function to safely serialize objects for JSON response""" + import numpy as np + + def convert_obj(o): + if isinstance(o, np.ndarray): + return o.tolist() + elif isinstance(o, np.integer): + return int(o) + elif isinstance(o, np.floating): + return float(o) + elif isinstance(o, dict): + return {k: convert_obj(v) for k, v in o.items()} + elif isinstance(o, (list, tuple)): + return [convert_obj(item) for item in o] + else: + return o + + return convert_obj(obj) + + +def run_construct_mock_localization_output( + x: float, + y: float, + angle: float, + place: str, + building: str, + floor: str, +) -> dict: + """ + Construct a mock localization output from user-provided coordinates. + This allows skipping the actual localization phase when coordinates are known. + """ + return { + "floorplan_pose": { + "xy": [x, y], + "ang": angle + }, + "best_map_key": (place, building, floor), + "refinement_queue": {} + } + + +def run_convert_navigation_to_trajectory( + navigation_result: Dict[str, Any] +) -> Dict[str, Any]: + """ + Convert navigation result format to trajectory output format. + """ + result = navigation_result.get("result", {}) + cmds = navigation_result.get("cmds", []) + best_map_key = navigation_result.get("best_map_key", []) + floorplan_pose = navigation_result.get("floorplan_pose", {}) + + place = best_map_key[0] if len(best_map_key) > 0 else "" + building = best_map_key[1] if len(best_map_key) > 1 else "" + floor = best_map_key[2] if len(best_map_key) > 2 else "" + + path_coords = result.get("path_coords", []) + + start_xy = floorplan_pose.get("xy", []) + start_ang = floorplan_pose.get("ang", 0) + + paths = [] + if start_xy and len(start_xy) >= 2: + if start_ang: + paths.append([start_xy[0], start_xy[1], start_ang]) + else: + paths.append(start_xy) + + for coord in path_coords: + if len(coord) >= 2: + paths.append(coord) + + scale = 0.02205862195 + + trajectory_data = { + "trajectory": [ + { + "0": { + "name": "destination", + "building": building, + "floor": floor, + "paths": paths, + "command": { + "instructions": cmds, + "are_instructions_generated": len(cmds) > 0, + }, + "scale": scale, + } + }, + None, + ], + "scale": scale, + } + + return trajectory_data + + +def run_set_navigation_context( + server: Any, + user_id: str, + dest_id: str, + target_place: str, + target_building: str, + target_floor: str, + unit: str = "meter", + language: str = "en", +) -> Dict[str, Any]: + """ + Set navigation context for a user session. + """ + try: + server.update_session( + user_id, + { + "selected_dest_id": dest_id, + "target_place": target_place, + "target_building": target_building, + "target_floor": target_floor, + "unit": unit, + "language": language, + }, + ) + + return { + "status": "success", + "message": "Navigation context set successfully", + } + except Exception as e: + return {"status": "error", "message": str(e), "type": type(e).__name__} diff --git a/src/modal_functions/unav_v2/logic/vlm.py b/src/modal_functions/unav_v2/logic/vlm.py new file mode 100644 index 0000000..b15c5fd --- /dev/null +++ b/src/modal_functions/unav_v2/logic/vlm.py @@ -0,0 +1,74 @@ +from typing import Any + +import numpy as np + + +def run_vlm_on_image(server: Any, image: np.ndarray) -> str: + """ + Run VLM on the provided image to extract text using Gemini 2.5 Flash. + """ + if hasattr(server, "tracer") and server.tracer: + with server.tracer.start_as_current_span( + "vlm_text_extraction_span" + ) as vlm_span: + return _vlm_extract_text(image) + + return _vlm_extract_text(image) + + +def _vlm_extract_text(image: np.ndarray) -> str: + try: + from google import genai + from google.genai import types + import cv2 + import os + + GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") + if not GEMINI_API_KEY: + error_msg = "GEMINI_API_KEY environment variable not set. Please set it with your API key in Modal Secrets." + print(f"❌ {error_msg}") + return error_msg + + client = genai.Client(api_key=GEMINI_API_KEY) + GEMINI_MODEL = "gemini-2.5-flash" + + image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + _, image_bytes = cv2.imencode(".jpg", image_rgb) + image_bytes = image_bytes.tobytes() + + prompt = """Analyze this image and extract all visible text content. + Please provide: + 1. All readable text, signs, labels, and written content + 2. Any numbers, codes, or identifiers visible + 3. Location descriptions or directional information if present + + Format the response as clear, readable text without extra formatting.""" + + response = client.models.generate_content( + model=GEMINI_MODEL, + contents=[ + prompt, + types.Part.from_bytes( + data=image_bytes, mime_type="image/jpeg" + ), + ], + ) + + extracted_text = ( + response.text if response.text else "No text extracted" + ) + + print( + f"✅ VLM extraction successful: {len(extracted_text)} characters extracted" + ) + + return extracted_text + + except ImportError as e: + error_msg = f"Missing required library for VLM: {str(e)}. Please install: pip install google-genai" + print(f"❌ {error_msg}") + return error_msg + except Exception as e: + error_msg = f"VLM extraction failed: {str(e)}" + print(f"❌ {error_msg}") + return error_msg diff --git a/src/modal_functions/unav_v2/media/image.png b/src/modal_functions/unav_v2/media/image.png new file mode 100644 index 0000000..faf00e2 Binary files /dev/null and b/src/modal_functions/unav_v2/media/image.png differ diff --git a/src/modal_functions/unav_v2/media/image2.png b/src/modal_functions/unav_v2/media/image2.png new file mode 100644 index 0000000..2dc37ae Binary files /dev/null and b/src/modal_functions/unav_v2/media/image2.png differ diff --git a/src/modal_functions/unav_v2/media/vinay_sample.jpeg b/src/modal_functions/unav_v2/media/vinay_sample.jpeg new file mode 100644 index 0000000..ffafcc0 Binary files /dev/null and b/src/modal_functions/unav_v2/media/vinay_sample.jpeg differ diff --git a/src/modal_functions/unav_v2/server_methods/__init__.py b/src/modal_functions/unav_v2/server_methods/__init__.py new file mode 100644 index 0000000..ce90d8e --- /dev/null +++ b/src/modal_functions/unav_v2/server_methods/__init__.py @@ -0,0 +1,11 @@ +from .helpers import ( + _get_queue_key_for_image_shape, + _get_refinement_queue_for_map, + _update_refinement_queue, +) + +__all__ = [ + "_get_queue_key_for_image_shape", + "_get_refinement_queue_for_map", + "_update_refinement_queue", +] diff --git a/src/modal_functions/unav_v2/server_methods/helpers.py b/src/modal_functions/unav_v2/server_methods/helpers.py new file mode 100644 index 0000000..5f4253e --- /dev/null +++ b/src/modal_functions/unav_v2/server_methods/helpers.py @@ -0,0 +1,26 @@ +from typing import Dict, Any + + +def _get_queue_key_for_image_shape(image_shape): + """Get a queue key based on image shape - returns tuple like unav-server for consistency.""" + if image_shape is None: + return None + return image_shape[:2] # Returns tuple (h, w) - matches unav-server behavior + + +def _get_refinement_queue_for_map(queue_dict, map_key, queue_key): + """Get the refinement queue for a specific map_key and queue_key (image shape bucket).""" + if map_key not in queue_dict: + return {"pairs": [], "initial_poses": [], "pps": []} + map_queues = queue_dict[map_key] + if queue_key not in map_queues: + return {"pairs": [], "initial_poses": [], "pps": []} + return map_queues[queue_key] + + +def _update_refinement_queue(queue_dict, map_key, queue_key, new_queue_state): + """Update the refinement queue for a specific map_key and queue_key.""" + if map_key not in queue_dict: + queue_dict[map_key] = {} + queue_dict[map_key][queue_key] = new_queue_state + return queue_dict diff --git a/src/modal_functions/unav_v2/test_modal_functions.py b/src/modal_functions/unav_v2/test_modal_functions.py index 7912ad4..3534ac4 100644 --- a/src/modal_functions/unav_v2/test_modal_functions.py +++ b/src/modal_functions/unav_v2/test_modal_functions.py @@ -8,12 +8,12 @@ def main(): # Common parameters - BUILDING = "Jubilee" - PLACE = "Mahidol_University" - FLOOR = "fl2" - DESTINATION_ID = "32" + BUILDING = "Langone" + PLACE = "New_York_University" + FLOOR = "17_floor" + DESTINATION_ID = "52" SESSION_ID = "test_session_id_2" - IMAGE_PATH = "media/sample_image_20.jpg" + IMAGE_PATH = "media/vinay_sample.jpeg" try: UnavServer = modal.Cls.lookup("unav-server-v21", "UnavServer") @@ -54,6 +54,7 @@ def main(): building=BUILDING, floor=FLOOR, place=PLACE, + enable_multifloor=False, ) print("Planner Result:", planner_result) diff --git a/src/modal_functions/unav_v2/unav_modal.py b/src/modal_functions/unav_v2/unav_modal.py index 2dcbdc4..daacd05 100644 --- a/src/modal_functions/unav_v2/unav_modal.py +++ b/src/modal_functions/unav_v2/unav_modal.py @@ -2,13 +2,23 @@ import json import traceback import numpy as np -import json import os from typing import Dict, List, Any, Optional from .deploy_config import get_scaledown_window, get_gpu_config, get_memory_mb from .modal_config import app, unav_image, volume, gemini_secret, middleware_secret from .destinations_service import get_destinations_list_impl +from .logic import ( + run_planner, + run_localize_user, + run_init_middleware, + run_init_cpu_components, + run_init_gpu_components, + run_monkey_patch_localizer_methods, + run_monkey_patch_pose_refinement, + run_monkey_patch_feature_extractors, + run_monkey_patch_matching_and_ransac, +) @app.cls( @@ -28,1001 +38,41 @@ class UnavServer: _middleware_init_pending: bool = False def _gpu_available(self) -> bool: - """Utility to detect whether CUDA GPUs are currently accessible.""" - try: - import torch - - available = torch.cuda.is_available() - print(f"[GPU CHECK] torch.cuda.is_available(): {available}") - return available - except Exception as exc: - print(f"[GPU CHECK] Unable to determine GPU availability: {exc}") - return False + return True def _configure_middleware_tracing(self): - print("🔧 [CONFIGURE] Starting Middleware.io configuration...") - from middleware import mw_tracker, MWOptions - from opentelemetry import trace - import os - - api_key = os.environ.get("MW_API_KEY") - target = os.environ.get("MW_TARGET") - - if not api_key or not target: - print( - "⚠️ Warning: MW_API_KEY and MW_TARGET not set. Skipping middleware initialization." - ) - self.tracer = None - return - - try: - mw_tracker( - MWOptions( - access_token=api_key, - target=target, - service_name="UNav-Server", - console_exporter=False, - log_level="INFO", - collect_profiling=True, - collect_traces=True, - collect_metrics=True, - ) - ) - - self.tracer = trace.get_tracer(__name__) - print("✅ Middleware.io initialized successfully") - print(f"✅ [CONFIGURE] Tracer created: {type(self.tracer).__name__}") - except Exception as e: - print(f"⚠️ Warning: Failed to initialize Middleware.io: {e}") - self.tracer = None - print(f"⚠️ [CONFIGURE] Tracer set to None due to error") + pass @enter(snap=False) def initialize_middleware(self): - """ - Initialize Middleware.io tracking for profiling and telemetry. - """ - print("🔧 [Phase 0] Initializing Middleware.io...") - print( - f"🔧 [Phase 0] Current tracer state: {getattr(self, 'tracer', 'NOT_SET')}" - ) - print( - f"🔧 [Phase 0] Middleware init pending: {getattr(self, '_middleware_init_pending', 'NOT_SET')}" - ) - - if not self._gpu_available(): - print( - "⏸️ GPU not yet available; deferring Middleware.io initialization until a GPU-backed container is ready." - ) - self._middleware_init_pending = True - self.tracer = None - print(f"🔧 [Phase 0] Set _middleware_init_pending=True, tracer=None") - return - - self._middleware_init_pending = False - print("✅ GPU available; proceeding with Middleware.io initialization") - self._configure_middleware_tracing() + """Delegated to logic.init""" + run_init_middleware(self) @enter(snap=False) def initialize_cpu_components(self): - """ - Initialize CPU-only components that can be safely snapshotted. - This includes configuration, data loading, and navigation setup. - """ - print("🚀 [Phase 1] Initializing CPU components for snapshotting...") - - from unav.config import UNavConfig - from unav.navigator.multifloor import FacilityNavigator - from unav.navigator.commander import commands_from_result - - # Configuration constants - self.DATA_ROOT = "/root/UNav-IO/data" - self.FEATURE_MODEL = "DinoV2Salad" - self.LOCAL_FEATURE_MODEL = "superpoint+lightglue" - self.PLACES = self.get_places() # Load all places but defer map loading - - print("🔧 Initializing UNavConfig...") - self.config = UNavConfig( - data_final_root=self.DATA_ROOT, - places=self.PLACES, - global_descriptor_model=self.FEATURE_MODEL, - local_feature_model=self.LOCAL_FEATURE_MODEL, - ) - print("✅ UNavConfig initialized successfully") - - # Extract specific sub-configs for localization and navigation modules - self.localizor_config = self.config.localizer_config - self.navigator_config = self.config.navigator_config - print("✅ Config objects extracted successfully") - - print("🧭 Initializing FacilityNavigator (CPU-only)...") - self.nav = FacilityNavigator(self.navigator_config) - print("✅ FacilityNavigator initialized successfully") - - # Store commander function for navigation - self.commander = commands_from_result - - # Initialize loaded places tracking (all places are now in config, but maps not loaded) - self.maps_loaded = set() - - # Cache for selective localizers (key: (place, building, floor) -> localizer instance) - self.selective_localizers = {} - - # Set flag to indicate CPU components are ready - self.cpu_components_initialized = True - print("📸 CPU components ready for snapshotting!") + """Delegated to logic.init""" + run_init_cpu_components(self) @enter(snap=False) def initialize_gpu_components(self): - """ - Initialize GPU-dependent components that cannot be snapshotted. - This must run after snapshot restoration on GPU-enabled containers. - """ - print("🚀 [Phase 2] Initializing GPU components after snapshot restoration...") - - # --- GPU DEBUG INFO --- - try: - import torch - - cuda_available = torch.cuda.is_available() - print(f"[GPU DEBUG] torch.cuda.is_available(): {cuda_available}") - - if not cuda_available: - print( - "[GPU ERROR] CUDA not available! This will cause model loading to fail." - ) - print( - "[GPU ERROR] Modal should have allocated a GPU. Raising exception to trigger retry..." - ) - raise RuntimeError( - "GPU not available when required. Modal will retry with GPU allocation." - ) - - print(f"[GPU DEBUG] torch.cuda.device_count(): {torch.cuda.device_count()}") - print( - f"[GPU DEBUG] torch.cuda.current_device(): {torch.cuda.current_device()}" - ) - print( - f"[GPU DEBUG] torch.cuda.get_device_name(0): {torch.cuda.get_device_name(0)}" - ) - except Exception as gpu_debug_exc: - print(f"[GPU DEBUG] Error printing GPU info: {gpu_debug_exc}") - # If it's our intentional GPU check failure, re-raise it - if "GPU not available when required" in str(gpu_debug_exc): - raise - # --- END GPU DEBUG INFO --- - - # Ensure CPU components are initialized - if not hasattr(self, "cpu_components_initialized"): - print("⚠️ CPU components not initialized, initializing now...") - self.initialize_cpu_components() - - from unav.localizer.localizer import UNavLocalizer - - print("🤖 Initializing UNavLocalizer (GPU-dependent)...") - self.localizer = UNavLocalizer(self.localizor_config) - - # Add fine-grained tracing to internal localizer steps without editing the package - #(monkey-patch key methods if available) - try: - self._monkey_patch_localizer_methods(self.localizer) - self._monkey_patch_pose_refinement() - #self._monkey_patch_matching_and_ransac() - self._monkey_patch_feature_extractors() - except Exception as e: - print(f"⚠️ Failed to monkey-patch UNavLocalizer methods: {e}") - - # Skip loading all maps/features at startup - will load on demand - print("✅ UNavLocalizer initialized (maps will load on demand)") - - # Set flag to indicate full system is ready - self.gpu_components_initialized = True - print("🎉 Full UNav system initialization complete! Ready for fast inference.") - print( - f"🎉 [Phase 2] Checking for deferred middleware init: _middleware_init_pending={getattr(self, '_middleware_init_pending', 'NOT_SET')}" - ) - - if getattr(self, "_middleware_init_pending", False): - print( - "🔁 GPU acquired; completing deferred Middleware.io initialization..." - ) - print( - f"🔁 [Phase 2] Before deferred init: tracer={getattr(self, 'tracer', 'NOT_SET')}" - ) - self._middleware_init_pending = False - self._configure_middleware_tracing() - # Re-apply patches with the new tracer - try: - self._monkey_patch_localizer_methods(self.localizer) - self._monkey_patch_pose_refinement() - self._monkey_patch_matching_and_ransac() - self._monkey_patch_feature_extractors() - except Exception as e: - print(f"⚠️ Failed to re-patch after deferred init: {e}") - print( - f"🔁 [Phase 2] After deferred init: tracer={getattr(self, 'tracer', 'NOT_SET')}, _middleware_init_pending={getattr(self, '_middleware_init_pending', 'NOT_SET')}" - ) - else: - print("✅ [Phase 2] No deferred middleware initialization needed") - - def _monkey_patch_localizer_methods( - self, localizer, method_names: Optional[list] = None - ): - """ - Add spans to a set of internal UNavLocalizer methods by monkey-patching them. - - Args: - localizer: the UNavLocalizer instance to patch - method_names: optional list of method names to patch; if None, a conservative - default list is used and we also try to discover other candidates. - """ - import os + """Delegated to logic.init""" + run_init_gpu_components(self) - if not hasattr(self, "tracer") or not self.tracer: - return - - import functools - import inspect - import asyncio - - tracer = self.tracer - - # Target the ACTUAL UNavLocalizer methods from the localize() pipeline - # Main pipeline methods - default_candidates = [ - "extract_query_features", - "vpr_retrieve", - "get_candidates_data", - "batch_local_matching_and_ransac", - "multi_frame_pose_refine", - "transform_pose_to_floorplan", - ] - - # Additional internal components that are called by the pipeline methods - # These will show as child spans under their parent methods - internal_components = [ - "global_extractor", # Called by extract_query_features - "local_extractor", # Called by extract_query_features - "local_matcher", # Called by batch_local_matching_and_ransac - ] - - # Allow overriding the names via env var, e.g. MW_UNAV_TRACE_METHODS=extract_query_features,vpr_retrieve - override = os.getenv("MW_UNAV_TRACE_METHODS") - if override: - method_names = [m.strip() for m in override.split(",") if m.strip()] - else: - # Combine both pipeline methods and internal components - method_names = method_names or (default_candidates + internal_components) - - def _wrap(orig, name): - # don't double-wrap - if getattr(orig, "__mw_wrapped__", False): - return orig - - if inspect.iscoroutinefunction(orig): - - async def _async_wrapper(*args, **kwargs): - with tracer.start_as_current_span(f"unav.{name}") as span: - try: - return await orig(*args, **kwargs) - except Exception as exc: - span.record_exception(exc) - raise - - _async_wrapper.__mw_wrapped__ = True - return functools.wraps(orig)(_async_wrapper) - - else: - - def _sync_wrapper(*args, **kwargs): - with tracer.start_as_current_span(f"unav.{name}") as span: - try: - return orig(*args, **kwargs) - except Exception as exc: - span.record_exception(exc) - raise - - _sync_wrapper.__mw_wrapped__ = True - return functools.wraps(orig)(_sync_wrapper) - - patched = [] - - # Patch all methods explicitly listed (these are the exact UNavLocalizer methods) - for mname in method_names: - if hasattr(localizer, mname): - try: - orig = getattr(localizer, mname) - wrapped = _wrap(orig, mname) - setattr(localizer, mname, wrapped) - patched.append(mname) - except Exception as e: - print(f"⚠️ Failed to patch {mname}: {e}") - continue - - if patched: - print(f"🔧 Patched localizer methods for tracing: {patched}") - else: - print( - f"⚠️ Warning: No methods were patched. Available methods: {[m for m in dir(localizer) if not m.startswith('_')]}" - ) + def _monkey_patch_localizer_methods(self, localizer, method_names=None): + """Delegated to logic.init""" + run_monkey_patch_localizer_methods(self, localizer, method_names) def _monkey_patch_pose_refinement(self): - """ - Patch the child libraries (poselib, pyimplicitdist) that refine_pose_from_queue calls. - This is cleaner than rewriting the entire function and traces the actual bottlenecks. - """ - if not hasattr(self, "tracer") or not self.tracer: - return - - import functools - - tracer = self.tracer - - # Patch poselib functions - try: - import poselib - - # Check if already patched - if not getattr(poselib, "__mw_patched__", False): - original_estimate = poselib.estimate_1D_radial_absolute_pose - - @functools.wraps(original_estimate) - def traced_estimate(*args, **kwargs): - with tracer.start_as_current_span( - "unav.poselib.estimate_1D_radial" - ): - return original_estimate(*args, **kwargs) - - poselib.estimate_1D_radial_absolute_pose = traced_estimate - poselib.__mw_patched__ = True - print("🔧 Patched poselib.estimate_1D_radial_absolute_pose") - except Exception as e: - print(f"⚠️ Failed to patch poselib: {e}") - - # Patch pyimplicitdist functions - try: - import pyimplicitdist - - # Check if already patched - if not getattr(pyimplicitdist, "__mw_patched__", False): - # Patch pose_refinement_1D_radial - original_refine_1d = pyimplicitdist.pose_refinement_1D_radial - - @functools.wraps(original_refine_1d) - def traced_refine_1d(*args, **kwargs): - with tracer.start_as_current_span( - "unav.pyimplicitdist.pose_refinement_1D_radial" - ): - return original_refine_1d(*args, **kwargs) - - pyimplicitdist.pose_refinement_1D_radial = traced_refine_1d - - # Patch build_cost_matrix_multi - original_build_cm = pyimplicitdist.build_cost_matrix_multi - - @functools.wraps(original_build_cm) - def traced_build_cm(*args, **kwargs): - with tracer.start_as_current_span( - "unav.pyimplicitdist.build_cost_matrix_multi" - ): - return original_build_cm(*args, **kwargs) - - pyimplicitdist.build_cost_matrix_multi = traced_build_cm - - # Patch pose_refinement_multi - original_refine_multi = pyimplicitdist.pose_refinement_multi - - @functools.wraps(original_refine_multi) - def traced_refine_multi(*args, **kwargs): - with tracer.start_as_current_span( - "unav.pyimplicitdist.pose_refinement_multi" - ): - return original_refine_multi(*args, **kwargs) - - pyimplicitdist.pose_refinement_multi = traced_refine_multi - - pyimplicitdist.__mw_patched__ = True - print( - "🔧 Patched pyimplicitdist functions (pose_refinement_1D_radial, build_cost_matrix_multi, pose_refinement_multi)" - ) - except Exception as e: - print(f"⚠️ Failed to patch pyimplicitdist: {e}") + """Delegated to logic.init""" + run_monkey_patch_pose_refinement(self) def _monkey_patch_feature_extractors(self): - """ - Patch the feature extraction pipeline to add granular tracing. - Patches: - 1. extract_query_features function to trace preprocessing/postprocessing - 2. GlobalExtractors.__call__ to trace model inference - 3. Superpoint.extract_local_features to trace local extraction - """ - if not hasattr(self, "tracer") or not self.tracer: - return - - import functools - - tracer = self.tracer - - # Patch the extract_query_features function from unav.localizer.tools.feature_extractor - try: - from unav.localizer.tools import feature_extractor - - if not getattr(feature_extractor, "__mw_patched__", False): - original_extract = feature_extractor.extract_query_features - - @functools.wraps(original_extract) - def traced_extract_query_features( - query_img, - global_extractor, - local_extractor, - global_model_name, - device, - ): - # Simply wrap the original function - no code rewriting - with tracer.start_as_current_span("unav.extract_query_features"): - return original_extract( - query_img, - global_extractor, - local_extractor, - global_model_name, - device, - ) - - feature_extractor.extract_query_features = traced_extract_query_features - feature_extractor.__mw_patched__ = True - print("🔧 Patched extract_query_features with tracing wrapper") - except Exception as e: - print(f"⚠️ Failed to patch extract_query_features: {e}") - import traceback - - traceback.print_exc() - - # Patch GlobalExtractors.__call__ to trace the actual model forward pass - try: - from unav.core.feature.Global_Extractors import GlobalExtractors - - if not getattr(GlobalExtractors, "__mw_patched__", False): - original_call = GlobalExtractors.__call__ - - @functools.wraps(original_call) - def traced_global_call(self, request_model, images): - with tracer.start_as_current_span( - f"unav.global_extractor.{request_model}.model_forward" - ): - result = original_call(self, request_model, images) - return result - - GlobalExtractors.__call__ = traced_global_call - GlobalExtractors.__mw_patched__ = True - print( - "🔧 Patched GlobalExtractors.__call__ for model inference tracing" - ) - except Exception as e: - print(f"⚠️ Failed to patch GlobalExtractors: {e}") - import traceback - - traceback.print_exc() - - # Patch Superpoint.extract_local_features to trace preprocessing and model inference - # Try multiple import paths since the module structure may vary - superpoint_patched = False - import_paths_to_try = [ - ("unav.core.feature.local_extractor", "Superpoint"), - ("unav.core.third_party.SuperPoint_SuperGlue.base_model", "Superpoint"), - ] - - for module_path, class_name in import_paths_to_try: - if superpoint_patched: - break - try: - import importlib - - module = importlib.import_module(module_path) - if hasattr(module, class_name): - Superpoint = getattr(module, class_name) - print( - f"🔍 [DEBUG] Found {class_name} in {module_path}: {Superpoint}" - ) - print( - f"🔍 [DEBUG] Superpoint has extract_local_features: {hasattr(Superpoint, 'extract_local_features')}" - ) - print( - f"🔍 [DEBUG] Superpoint already patched: {getattr(Superpoint, '__mw_patched__', False)}" - ) - - if not getattr(Superpoint, "__mw_patched__", False): - original_extract_local = Superpoint.extract_local_features - - @functools.wraps(original_extract_local) - def traced_extract_local(self, image0): - # Simply wrap the original method - no code rewriting - with tracer.start_as_current_span( - "unav.local_extractor.extract_local_features" - ): - return original_extract_local(self, image0) - - Superpoint.extract_local_features = traced_extract_local - Superpoint.__mw_patched__ = True - superpoint_patched = True - print( - f"🔧 ✅ Patched Superpoint.extract_local_features from {module_path}" - ) - else: - print("⚠️ Superpoint already patched, skipping") - superpoint_patched = True - else: - print(f"🔍 [DEBUG] {class_name} not found in {module_path}") - except ImportError as e: - print(f"🔍 [DEBUG] Could not import {module_path}: {e}") - except Exception as e: - print(f"⚠️ Error patching Superpoint from {module_path}: {e}") - import traceback - - traceback.print_exc() - - if not superpoint_patched: - print("⚠️ Could not find Superpoint class to patch in any known location") - - # Patch LightGlue._forward to trace internal matching operations - self._monkey_patch_lightglue(tracer) - - def _monkey_patch_lightglue(self, tracer): - """ - Patch LightGlue's internal modules to add granular tracing. - We patch the forward methods of: - - Transformer (self-attention) - - CrossTransformer (cross-attention) - - MatchAssignment (match scoring) - - LearnableFourierPositionalEncoding (position encoding) - And wrap the main _forward method. - """ - import functools - - lightglue_patched = False - import_paths_to_try = [ - "unav.core.third_party.LightGlue.lightglue", - "unav.core.feature.lightglue", - ] - - for module_path in import_paths_to_try: - if lightglue_patched: - break - try: - import importlib - - module = importlib.import_module(module_path) - - # Check if LightGlue class exists - if not hasattr(module, "LightGlue"): - print(f"🔍 [DEBUG] LightGlue not found in {module_path}") - continue - - LightGlue = getattr(module, "LightGlue") - print(f"🔍 [DEBUG] Found LightGlue in {module_path}") - - if getattr(LightGlue, "__mw_patched__", False): - print("⚠️ LightGlue already patched, skipping") - lightglue_patched = True - continue - - # 1. Patch Transformer.forward (self-attention) - if hasattr(module, "Transformer"): - Transformer = getattr(module, "Transformer") - if not getattr(Transformer, "__mw_patched__", False): - original_transformer_forward = Transformer.forward - - @functools.wraps(original_transformer_forward) - def traced_transformer_forward( - self, x0, x1, encoding0=None, encoding1=None - ): - with tracer.start_as_current_span( - "unav.local_matcher.self_attention" - ): - return original_transformer_forward( - self, x0, x1, encoding0, encoding1 - ) - - Transformer.forward = traced_transformer_forward - Transformer.__mw_patched__ = True - print("🔧 ✅ Patched Transformer.forward (self-attention)") - - # 2. Patch CrossTransformer.forward (cross-attention) - if hasattr(module, "CrossTransformer"): - CrossTransformer = getattr(module, "CrossTransformer") - if not getattr(CrossTransformer, "__mw_patched__", False): - original_cross_forward = CrossTransformer.forward - - @functools.wraps(original_cross_forward) - def traced_cross_forward(self, x0, x1): - with tracer.start_as_current_span( - "unav.local_matcher.cross_attention" - ): - return original_cross_forward(self, x0, x1) - - CrossTransformer.forward = traced_cross_forward - CrossTransformer.__mw_patched__ = True - print( - "🔧 ✅ Patched CrossTransformer.forward (cross-attention)" - ) - - # 3. Patch MatchAssignment.forward (match scoring) - if hasattr(module, "MatchAssignment"): - MatchAssignment = getattr(module, "MatchAssignment") - if not getattr(MatchAssignment, "__mw_patched__", False): - original_match_forward = MatchAssignment.forward - - @functools.wraps(original_match_forward) - def traced_match_forward(self, desc0, desc1): - with tracer.start_as_current_span( - "unav.local_matcher.match_assignment" - ): - return original_match_forward(self, desc0, desc1) - - MatchAssignment.forward = traced_match_forward - MatchAssignment.__mw_patched__ = True - print("🔧 ✅ Patched MatchAssignment.forward") - - # 4. Patch LearnableFourierPositionalEncoding.forward (position encoding) - if hasattr(module, "LearnableFourierPositionalEncoding"): - PosEnc = getattr(module, "LearnableFourierPositionalEncoding") - if not getattr(PosEnc, "__mw_patched__", False): - original_posenc_forward = PosEnc.forward - - @functools.wraps(original_posenc_forward) - def traced_posenc_forward(self, x): - with tracer.start_as_current_span( - "unav.local_matcher.position_encoding" - ): - return original_posenc_forward(self, x) - - PosEnc.forward = traced_posenc_forward - PosEnc.__mw_patched__ = True - print( - "🔧 ✅ Patched LearnableFourierPositionalEncoding.forward" - ) - - # 5. Patch filter_matches function - if hasattr(module, "filter_matches"): - original_filter = getattr(module, "filter_matches") - if not getattr(original_filter, "__mw_patched__", False): - - @functools.wraps(original_filter) - def traced_filter_matches(scores, th): - with tracer.start_as_current_span( - "unav.local_matcher.filter_matches" - ): - return original_filter(scores, th) - - traced_filter_matches.__mw_patched__ = True - setattr(module, "filter_matches", traced_filter_matches) - print("🔧 ✅ Patched filter_matches") - - # 6. Wrap the main LightGlue._forward method - original_lightglue_forward = LightGlue._forward - - @functools.wraps(original_lightglue_forward) - def traced_lightglue_forward(self, data): - with tracer.start_as_current_span("unav.local_matcher._forward"): - return original_lightglue_forward(self, data) - - LightGlue._forward = traced_lightglue_forward - LightGlue.__mw_patched__ = True - lightglue_patched = True - print(f"🔧 ✅ Patched LightGlue._forward from {module_path}") - - except ImportError as e: - print(f"🔍 [DEBUG] Could not import {module_path}: {e}") - except Exception as e: - print(f"⚠️ Error patching LightGlue from {module_path}: {e}") - import traceback - - traceback.print_exc() - - if not lightglue_patched: - print("⚠️ Could not find LightGlue class to patch in any known location") + """Delegated to logic.init""" + run_monkey_patch_feature_extractors(self) def _monkey_patch_matching_and_ransac(self): - """ - Patch the child functions of batch_local_matching_and_ransac. - The function is in unav.localizer.tools.matcher, and internally calls - match_query_to_database and ransac_filter from unav.core.feature_filter. - """ - if not hasattr(self, "tracer") or not self.tracer: - return - - import functools - - tracer = self.tracer - - # Patch unav.core.feature_filter functions at MODULE level - # These are called by the matcher module's batch_local_matching_and_ransac - try: - from unav.core import feature_filter - - print(f"🔍 [DEBUG] Attempting to patch unav.core.feature_filter") - print(f"🔍 [DEBUG] feature_filter module: {feature_filter}") - print( - f"🔍 [DEBUG] match_query_to_database: {hasattr(feature_filter, 'match_query_to_database')}" - ) - print( - f"🔍 [DEBUG] ransac_filter: {hasattr(feature_filter, 'ransac_filter')}" - ) - - # Check if already patched - if not getattr(feature_filter, "__mw_patched__", False): - # Patch match_query_to_database - if hasattr(feature_filter, "match_query_to_database"): - original_match = feature_filter.match_query_to_database - - @functools.wraps(original_match) - def traced_match(*args, **kwargs): - print("🔍 [TRACE] ✅ Entering match_query_to_database") - with tracer.start_as_current_span( - "unav.match_query_to_database" - ): - result = original_match(*args, **kwargs) - print( - f"🔍 [TRACE] ✅ Exiting match_query_to_database, returned {len(result[0]) if result[0] else 0} matches" - ) - return result - - feature_filter.match_query_to_database = traced_match - print("🔧 ✅ Patched match_query_to_database") - else: - print("⚠️ match_query_to_database not found in feature_filter") - - # Patch ransac_filter - if hasattr(feature_filter, "ransac_filter"): - original_ransac = feature_filter.ransac_filter - - @functools.wraps(original_ransac) - def traced_ransac(*args, **kwargs): - print("🔍 [TRACE] ✅ Entering ransac_filter") - with tracer.start_as_current_span("unav.ransac_filter"): - result = original_ransac(*args, **kwargs) - print("🔍 [TRACE] ✅ Exiting ransac_filter") - return result - - feature_filter.ransac_filter = traced_ransac - print("🔧 ✅ Patched ransac_filter") - else: - print("⚠️ ransac_filter not found in feature_filter") - - feature_filter.__mw_patched__ = True - print("🔧 Patched feature_filter module successfully") - else: - print("⚠️ feature_filter already patched, skipping") - except ImportError as e: - print(f"⚠️ Could not import unav.core.feature_filter: {e}") - import traceback - - traceback.print_exc() - except Exception as e: - print(f"⚠️ Failed to patch feature_filter: {e}") - import traceback - - traceback.print_exc() - - def get_places( - self, - target_place: Optional[str] = None, - target_building: Optional[str] = None, - target_floor: Optional[str] = None, - enable_multifloor: bool = False, - ): - """Get available places configuration, optionally filtering to a specific floor""" - try: - print("📁 Fetching places from data directory...") - - # Define folders to skip at all levels - SKIP_FOLDERS = { - "features", - "colmap_map", - ".ipynb_checkpoints", - "parameters", - } - - def should_skip_folder(folder_name): - """Check if folder should be skipped based on name patterns""" - return ( - folder_name in SKIP_FOLDERS - or "_old" in folder_name.lower() - or folder_name.endswith("_old") - ) - - places: Dict[str, Dict[str, List[str]]] = {} - - data_root = getattr(self, "DATA_ROOT", "/root/UNav-IO/data") - - # Get all place directories (depth 1 under data/) - if os.path.exists(data_root): - for place_name in os.listdir(data_root): - place_path = os.path.join(data_root, place_name) - if os.path.isdir(place_path) and not should_skip_folder(place_name): - if target_place and place_name != target_place: - continue - places[place_name] = {} - print(f" ✓ Found place: {place_name}") - - # Get buildings for this place (depth 2) - for building_name in os.listdir(place_path): - building_path = os.path.join(place_path, building_name) - if os.path.isdir(building_path) and not should_skip_folder( - building_name - ): - if target_building and building_name != target_building: - continue - floors = [] - - # Get floors for this building (depth 3) - for floor_name in os.listdir(building_path): - floor_path = os.path.join(building_path, floor_name) - if os.path.isdir( - floor_path - ) and not should_skip_folder(floor_name): - # Skip specific problematic floor - if ( - place_name == "New_York_City" - and building_name == "LOH" - and floor_name == "9_floor" - ): - print( - f" ⚠️ Skipping {building_name}/{floor_name}: explicitly excluded" - ) - continue - - if ( - not enable_multifloor - and target_floor - and floor_name != target_floor - ): - continue - - # Validate that required navigation files exist - boundaries_file = os.path.join(floor_path, "boundaries.json") - if not os.path.exists(boundaries_file): - print( - f" ⚠️ Skipping {building_name}/{floor_name}: missing boundaries.json" - ) - continue - - floors.append(floor_name) - - if floors: # Only add building if it has floors - places[place_name][building_name] = floors - print( - f" ✓ Building: {building_name} with floors: {floors}" - ) - - # Remove places that have no buildings - places = {k: v for k, v in places.items() if v} - - if not enable_multifloor and target_floor: - # Ensure we only return the specific floor requested - for p_name in list(places.keys()): - buildings = places[p_name] - for b_name in list(buildings.keys()): - filtered_floors = [ - f_name - for f_name in buildings[b_name] - if f_name == target_floor - ] - if filtered_floors: - buildings[b_name] = filtered_floors - else: - del buildings[b_name] - if not buildings: - del places[p_name] - - print(f"✅ Found {len(places)} places with buildings and floors") - return places - else: - print(f"⚠️ Data root {data_root} does not exist, using fallback") - return self._get_fallback_places() - - except Exception as e: - print(f"❌ Error fetching places: {e}, using fallback") - return self._get_fallback_places() - - def _get_fallback_places(self): - """Fallback hardcoded places configuration""" - return { - "New_York_City": {"LightHouse": ["3_floor", "4_floor", "6_floor"]}, - "New_York_University": {"Langone": ["15_floor", "16_floor", "17_floor"]}, - "Mahidol_University": {"Jubilee": ["fl1", "fl2", "fl3"]}, - } - - def ensure_maps_loaded( - self, - place: str, - building: str = None, - floor: str = None, - enable_multifloor: bool = False, - ): - """ - Ensure that maps for a specific place/building are loaded. - When building is specified, loads all floors for that building. - Creates selective localizer instances for true lazy loading. - """ - if building: - if enable_multifloor or not floor: - map_key = (place, building) - else: - map_key = (place, building, floor) - else: - map_key = place - - if map_key in self.maps_loaded: - return # Already loaded - - print(f"🔄 [Phase 4] Creating selective localizer for: {map_key}") - - # Create selective places config with only the requested location - if building: - selective_places = self.get_places( - target_place=place, - target_building=building, - target_floor=floor, - enable_multifloor=enable_multifloor, - ) - else: - selective_places = self.get_places(target_place=place) - - if not selective_places: - print( - "⚠️ No matching places found for selective load; skipping localizer creation" - ) - return - - # Create selective config and localizer - from unav.config import UNavConfig - - selective_config = UNavConfig( - data_final_root=self.DATA_ROOT, - places=selective_places, - global_descriptor_model=self.FEATURE_MODEL, - local_feature_model=self.LOCAL_FEATURE_MODEL, - ) - - from unav.localizer.localizer import UNavLocalizer - import time - - selective_localizer = UNavLocalizer(selective_config.localizer_config) - - # # Optionally patch the selective localizer too - if hasattr(self, "tracer") and self.tracer: - try: - self._monkey_patch_localizer_methods(selective_localizer) - # Note: feature extractors are already patched at module level, no need to patch per instance - except Exception as e: - print(f"⚠️ Failed to patch selective localizer: {e}") - - # Load maps and features with tracing if available - if hasattr(self, "tracer") and self.tracer: - with self.tracer.start_as_current_span( - "load_maps_and_features_span" - ) as load_span: - load_span.add_event("Starting map and feature loading") - load_span.set_attribute("map_key", str(map_key)) - load_span.set_attribute("selective_places", str(selective_places)) - - start_load_time = time.time() - selective_localizer.load_maps_and_features() - load_duration = time.time() - start_load_time - - load_span.set_attribute("load_duration_seconds", load_duration) - load_span.add_event("Map and feature loading completed") - else: - print(f"⏱️ Starting load_maps_and_features for: {map_key}") - start_load_time = time.time() - selective_localizer.load_maps_and_features() - load_duration = time.time() - start_load_time - print(f"⏱️ Completed load_maps_and_features in {load_duration:.2f} seconds") - - # Cache the selective localizer - self.selective_localizers[map_key] = selective_localizer - self.maps_loaded.add(map_key) - print(f"✅ Selective localizer created and maps loaded for: {map_key}") + """Delegated to logic.init""" + run_monkey_patch_matching_and_ransac(self) def ensure_gpu_components_ready(self): """ @@ -1044,71 +94,6 @@ def update_session(self, user_id: str, updates: dict): session = self.get_session(user_id) session.update(updates) - def _construct_mock_localization_output( - self, - x: float, - y: float, - angle: float, - place: str, - building: str, - floor: str, - ) -> dict: - """ - Construct a mock localization output from user-provided coordinates. - This allows skipping the actual localization phase when coordinates are known. - - Args: - x: X coordinate on the floor plan - y: Y coordinate on the floor plan - angle: Heading angle (direction user is facing) - place: Place name - building: Building name - floor: Floor name - - Returns: - dict: Mock localization output matching the structure from localizer.localize() - """ - return { - "floorplan_pose": { - "xy": [x, y], - "ang": angle - }, - "best_map_key": (place, building, floor), - "refinement_queue": {} # Empty since we're not doing actual localization - } - - @method() - def set_navigation_context( - self, - user_id: str, - dest_id: str, - target_place: str, - target_building: str, - target_floor: str, - unit: str = "meter", - language: str = "en", - ): - """Set navigation context for a user session""" - try: - self.update_session( - user_id, - { - "selected_dest_id": dest_id, - "target_place": target_place, - "target_building": target_building, - "target_floor": target_floor, - "unit": unit, - "language": language, - }, - ) - - return { - "status": "success", - "message": "Navigation context set successfully", - } - except Exception as e: - return {"status": "error", "message": str(e), "type": type(e).__name__} - @method() def start_server(self): import json @@ -1168,524 +153,35 @@ def planner( language: str = "en", refinement_queue: dict = None, is_vlm_extraction_enabled: bool = False, - enable_multifloor: bool = False, + enable_multifloor: bool = True, should_use_user_provided_coordinate: bool = False, x: float = None, y: float = None, angle: float = None, + turn_mode: str = "default", ): - """ - Full localization and navigation pipeline with timing tracking and middleware tracing. - - Args: - session_id: Unique identifier for the user session - base_64_image: Base64 encoded image or numpy array (optional if using provided coordinates) - destination_id: Destination node ID - place: Place name - building: Building name - floor: Floor name - top_k: Number of top candidates for localization - unit: Distance unit ("meter" or "feet") - language: Language code for instructions - refinement_queue: Queue for pose refinement - is_vlm_extraction_enabled: Enable VLM text extraction fallback - enable_multifloor: Enable multi-floor navigation - should_use_user_provided_coordinate: If True, skip localization and use provided x, y, angle - x: X coordinate on floor plan (required if should_use_user_provided_coordinate=True) - y: Y coordinate on floor plan (required if should_use_user_provided_coordinate=True) - angle: Heading angle in degrees (required if should_use_user_provided_coordinate=True) - """ - import time - import logging - import uuid - - # Generate a unique ID for this specific call to planner - call_id = str(uuid.uuid4()) - - # Check tracing availability - has_tracer = hasattr(self, "tracer") and self.tracer is not None - print( - f"📋 [PLANNER] Called with session_id={session_id}, call_id={call_id}, has_tracer={has_tracer}, tracer_type={type(getattr(self, 'tracer', None)).__name__}" + """Full localization and navigation pipeline.""" + return run_planner( + self, + session_id=session_id, + base_64_image=base_64_image, + destination_id=destination_id, + place=place, + building=building, + floor=floor, + top_k=top_k, + unit=unit, + language=language, + refinement_queue=refinement_queue, + is_vlm_extraction_enabled=is_vlm_extraction_enabled, + enable_multifloor=enable_multifloor, + should_use_user_provided_coordinate=should_use_user_provided_coordinate, + x=x, + y=y, + angle=angle, + turn_mode=turn_mode, ) - # Create parent span for the entire planner operation if tracer is available - if has_tracer: - print(f"📋 [PLANNER] Using TRACED execution path for call_id={call_id}") - with self.tracer.start_as_current_span("planner_span") as parent_span: - parent_span.set_attribute("unav.call_id", call_id) - parent_span.set_attribute("unav.session_id", session_id) - # Start total timing - start_time = time.time() - timing_data = {} - image = None - - # Validate user-provided coordinates if enabled - if should_use_user_provided_coordinate: - if x is None or y is None or angle is None: - return { - "status": "error", - "error": "When should_use_user_provided_coordinate=True, x, y, and angle must all be provided.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - print(f"📍 Using user-provided coordinates: x={x}, y={y}, angle={angle}°") - # Image is optional when using provided coordinates - if base_64_image is not None: - print("⚠️ Image provided but will be ignored since using provided coordinates") - elif base_64_image is None: - # Image is required when not using provided coordinates - return { - "status": "error", - "error": "No image provided. base_64_image parameter is required when not using provided coordinates.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - # Convert base64 string to BGR numpy array using OpenCV (skip if using provided coordinates) - if not should_use_user_provided_coordinate: - if isinstance(base_64_image, str): - import base64 - import cv2 - - try: - # Fix base64 padding if needed - base64_string = base_64_image - print( - f"Received base64 image string of length {len(base64_string)}" - ) - ## print the first 50 characers of bas64 string - # print(f"{base64_string[0:50]}") - # Remove data URL prefix if present (e.g., "data:image/jpeg;base64,") - if "," in base64_string: - base64_string = base64_string.split(",")[1] - - # Add padding if necessary - missing_padding = len(base64_string) % 4 - if missing_padding: - base64_string += "=" * (4 - missing_padding) - - # Decode base64 string to bytes - image_bytes = base64.b64decode(base64_string) - - # print(f"Image bytes {image_bytes}") - # Convert bytes to numpy array - image_array = np.frombuffer(image_bytes, dtype=np.uint8) - # Decode image using OpenCV (automatically in BGR format) - image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - - if image is None: - return { - "status": "error", - "error": "Failed to decode base64 image. Invalid image format.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - except Exception as img_error: - return { - "status": "error", - "error": f"Error processing base64 image: {str(img_error)}", - "timing": {"total": (time.time() - start_time) * 1000}, - } - elif isinstance(base_64_image, np.ndarray): - # If already a numpy array, use it directly (assume BGR format) - image = base_64_image - else: - return { - "status": "error", - "error": f"Unsupported image format. Expected base64 string or numpy array, got {type(base_64_image)}", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - # --- GPU DEBUG INFO --- - try: - import torch - - cuda_available = torch.cuda.is_available() - print(f"[GPU DEBUG] torch.cuda.is_available(): {cuda_available}") - if cuda_available: - print( - f"[GPU DEBUG] torch.cuda.device_count(): {torch.cuda.device_count()}" - ) - print( - f"[GPU DEBUG] torch.cuda.current_device(): {torch.cuda.current_device()}" - ) - print( - f"[GPU DEBUG] torch.cuda.get_device_name(0): {torch.cuda.get_device_name(0)}" - ) - # import subprocess - # nvidia_smi = subprocess.check_output(["nvidia-smi"]).decode() - # print(f"[GPU DEBUG] nvidia-smi output:\n{nvidia_smi}") - except Exception as gpu_debug_exc: - print(f"[GPU DEBUG] Error printing GPU info: {gpu_debug_exc}") - # --- END GPU DEBUG INFO --- - - try: - # Step 1: Setup and session management - setup_start = time.time() - - dest_id = destination_id - target_place = place - target_building = building - target_floor = floor - user_id = session_id - # Get user session - session = self.get_session(user_id) - - # Use provided parameters or fallback to session values - if not dest_id: - dest_id = session.get("selected_dest_id") - if not target_place: - target_place = session.get("target_place") - if not target_building: - target_building = session.get("target_building") - if not target_floor: - target_floor = session.get("target_floor") - if unit == "feet": - unit = session.get("unit", "feet") - if language == "en": - language = session.get("language", "en") - - # Check for required navigation context - if not all([dest_id, target_place, target_building, target_floor]): - return { - "status": "error", - "error": "Incomplete navigation context. Please select a destination.", - "missing_fields": { - "dest_id": dest_id is None, - "target_place": target_place is None, - "target_building": target_building is None, - "target_floor": target_floor is None, - }, - } - - # Automatically set/update navigation context in session - self.update_session( - user_id, - { - "selected_dest_id": dest_id, - "target_place": target_place, - "target_building": target_building, - "target_floor": target_floor, - "unit": unit, - "language": language, - }, - ) - - # Use provided refinement queue or get from session - if refinement_queue is None: - refinement_queue = session.get("refinement_queue") or {} - - timing_data["setup"] = ( - time.time() - setup_start - ) * 1000 # Convert to ms - print(f"⏱️ Setup: {timing_data['setup']:.2f}ms") - - # Step 2: Localization (or skip if using provided coordinates) - if should_use_user_provided_coordinate: - # Skip localization - construct mock output from provided coordinates - print("⏭️ Skipping localization - using provided coordinates") - localization_start = time.time() - - # Construct mock localization output - output = self._construct_mock_localization_output( - x=x, - y=y, - angle=angle, - place=target_place, - building=target_building, - floor=target_floor, - ) - - timing_data["localization"] = ( - time.time() - localization_start - ) * 1000 - print(f"⏱️ Mock Localization: {timing_data['localization']:.2f}ms") - else: - # Normal localization process - localization_start = time.time() - - # Create child span for localization - with self.tracer.start_as_current_span( - "localization_span" - ) as localization_span: - # Ensure GPU components are ready (initializes localizer) - self.ensure_gpu_components_ready() - - with self.tracer.start_as_current_span( - "load_maps_span" - ) as load_maps_span: - # Ensure maps are loaded for the target location - self.ensure_maps_loaded( - target_place, - target_building, - floor=target_floor, - enable_multifloor=enable_multifloor, - ) - - # Get the selective localizer for this building (all floors loaded) - map_key = (target_place, target_building) - localizer_to_use = self.selective_localizers.get(map_key) - if not localizer_to_use and target_floor: - floor_key = (target_place, target_building, target_floor) - localizer_to_use = self.selective_localizers.get( - floor_key, self.localizer - ) - else: - localizer_to_use = localizer_to_use or self.localizer - - # Localizer already patched in ensure_maps_loaded() or initialize_gpu_components() - # No need to patch again here to avoid double-wrapping spans - - output = localizer_to_use.localize( - image, refinement_queue, top_k=top_k - ) - - timing_data["localization"] = ( - time.time() - localization_start - ) * 1000 - print(f"⏱️ Localization: {timing_data['localization']:.2f}ms") - - if output is None or "floorplan_pose" not in output: - print("❌ Localization failed, no pose found.") - - if is_vlm_extraction_enabled: - # Run VLM to extract text from image as fallback - try: - print( - "🔄 Attempting VLM fallback for text extraction..." - ) - extracted_text = self.run_vlm_on_image(image) - - # Log the extracted text for debugging - print( - f"📝 VLM extracted text: {extracted_text[:200]}..." - ) - - # You can add logic here to process the extracted text - # For example, search for room numbers, building names, etc. - # and use that information to provide approximate location or guidance - - return { - "status": "error", - "error": "Localization failed, but VLM text extraction completed.", - "extracted_text": extracted_text, - "timing": timing_data, - "fallback_info": "Text was extracted from the image but precise localization failed. Please try taking a clearer photo or move to a different location.", - } - - except Exception as vlm_error: - print(f"❌ Error during VLM fallback: {vlm_error}") - return { - "status": "error", - "error": "Localization failed and VLM fallback also failed.", - "vlm_error": str(vlm_error), - "timing": timing_data, - } - - return { - "status": "error", - "error": "Localization failed, no pose found.", - "error_code": "localization_failed", - "timing": timing_data, - } - - # Step 3: Process localization results - processing_start = time.time() - - floorplan_pose = output["floorplan_pose"] - start_xy, start_heading = ( - floorplan_pose["xy"], - -floorplan_pose["ang"], - ) - source_key = output["best_map_key"] - start_place, start_building, start_floor = source_key - - # Validate start position - if start_xy is None or len(start_xy) != 2: - return { - "status": "error", - "error": "Invalid start position from localization.", - "timing": timing_data, - } - - # Check if we're on the right floor for navigation - if (start_place, start_building, start_floor) != ( - target_place, - target_building, - target_floor, - ): - # Multi-floor navigation will be handled by the navigator - pass - - # Update user's current floor context and pose for real-time tracking - self.update_session( - user_id, - { - "current_place": start_place, - "current_building": start_building, - "current_floor": start_floor, - "floorplan_pose": floorplan_pose, - "refinement_queue": output["refinement_queue"], - }, - ) - - # Convert dest_id to int if it's a string (common issue) - try: - dest_id_for_path = int(dest_id) - except (ValueError, TypeError): - dest_id_for_path = dest_id - - timing_data["processing"] = (time.time() - processing_start) * 1000 - print(f"⏱️ Processing: {timing_data['processing']:.2f}ms") - - # Step 4: Path planning - path_planning_start = time.time() - - # Create child span for path planning - with self.tracer.start_as_current_span( - "path_planning_span" - ) as path_planning_span: - # Plan navigation path to destination - result = self.nav.find_path( - start_place, - start_building, - start_floor, - start_xy, - target_place, - target_building, - target_floor, - dest_id_for_path, - ) - - timing_data["path_planning"] = ( - time.time() - path_planning_start - ) * 1000 - print(f"⏱️ Path Planning: {timing_data['path_planning']:.2f}ms") - - if result is None: - return { - "status": "error", - "error": "Path planning failed. Could not find route to destination.", - "timing": timing_data, - } - - # Check if result contains an error - if isinstance(result, dict) and "error" in result: - return { - "status": "error", - "error": f"Path planning failed: {result['error']}", - "timing": timing_data, - } - - # Step 5: Command generation - command_generation_start = time.time() - - # Create child span for command generation - with self.tracer.start_as_current_span( - "command_generation_span" - ) as command_span: - # Generate spoken/navigation commands - cmds = self.commander( - self.nav, - result, - initial_heading=start_heading, - unit=unit, - language=language, - ) - - timing_data["command_generation"] = ( - time.time() - command_generation_start - ) * 1000 - print( - f"⏱️ Command Generation: {timing_data['command_generation']:.2f}ms" - ) - - # Step 6: Serialization - serialization_start = time.time() - - serialized_result = self._safe_serialize(result) - serialized_cmds = self._safe_serialize(cmds) - serialized_source_key = self._safe_serialize(source_key) - serialized_floorplan_pose = self._safe_serialize(floorplan_pose) - - timing_data["serialization"] = ( - time.time() - serialization_start - ) * 1000 - print(f"⏱️ Serialization: {timing_data['serialization']:.2f}ms") - - # Calculate total time - timing_data["total"] = (time.time() - start_time) * 1000 - print(f"⏱️ Total Navigation Time: {timing_data['total']:.2f}ms") - - # Log summary - logging.info( - f"Navigation pipeline completed successfully. " - f"Total time: {timing_data['total']:.0f}ms, " - f"Localization: {timing_data['localization']:.0f}ms, " - f"Path planning: {timing_data['path_planning']:.0f}ms" - ) - - # Print summary - print(f"📊 Timing Breakdown:") - print( - f" Setup: {timing_data['setup']:.1f}ms ({timing_data['setup']/timing_data['total']*100:.1f}%)" - ) - print( - f" Localization: {timing_data['localization']:.1f}ms ({timing_data['localization']/timing_data['total']*100:.1f}%)" - ) - print( - f" Processing: {timing_data['processing']:.1f}ms ({timing_data['processing']/timing_data['total']*100:.1f}%)" - ) - print( - f" Path Planning: {timing_data['path_planning']:.1f}ms ({timing_data['path_planning']/timing_data['total']*100:.1f}%)" - ) - print( - f" Commands: {timing_data['command_generation']:.1f}ms ({timing_data['command_generation']/timing_data['total']*100:.1f}%)" - ) - print( - f" Serialization: {timing_data['serialization']:.1f}ms ({timing_data['serialization']/timing_data['total']*100:.1f}%)" - ) - - # Return all relevant info safely serialized for JSON - result = { - "status": "success", - "result": serialized_result, - "cmds": serialized_cmds, - "best_map_key": serialized_source_key, - "floorplan_pose": serialized_floorplan_pose, - "navigation_info": { - "start_location": f"{start_place}/{start_building}/{start_floor}", - "destination": f"{target_place}/{target_building}/{target_floor}", - "dest_id": dest_id, - "unit": unit, - "language": language, - }, - "timing": timing_data, # Include timing data in response - } - - return self.convert_navigation_to_trajectory(result) - - except Exception as e: - # Calculate partial timing data - timing_data["total"] = (time.time() - start_time) * 1000 - - # Log current state for debugging - try: - session = self.get_session(user_id) - except: - session = None - - import traceback - - traceback.print_exc() - - return { - "status": "error", - "error": str(e), - "type": type(e).__name__, - "timing": timing_data, - } - else: - print("📋 [PLANNER] Using NON-TRACED execution path") - pass - @method() def localize_user( self, @@ -1696,725 +192,20 @@ def localize_user( floor: str, top_k: int = None, refinement_queue: dict = None, - enable_multifloor: bool = False, - ): - """ - Localize user position without navigation planning. - Returns the user's current position and floor information. - - Args: - session_id: Unique identifier for the user session - base_64_image: Base64 encoded image or numpy array - place: Target place name - building: Target building name - floor: Target floor name - top_k: Number of top candidates to retrieve (optional) - refinement_queue: Queue for pose refinement (optional) - enable_multifloor: Whether to enable multi-floor localization - - Returns: - dict: Contains status, floorplan_pose, best_map_key, and timing information - """ - import time - import cv2 - import base64 - - start_time = time.time() - timing_data = {} - - # Validate and convert image input - if base_64_image is None: - return { - "status": "error", - "error": "No image provided. base_64_image parameter is required.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - # Convert base64 string to BGR numpy array - if isinstance(base_64_image, str): - try: - base64_string = base_64_image - - # Remove data URL prefix if present - if "," in base64_string: - base64_string = base64_string.split(",")[1] - - # Add padding if necessary - missing_padding = len(base64_string) % 4 - if missing_padding: - base64_string += "=" * (4 - missing_padding) - - # Decode and convert to image - image_bytes = base64.b64decode(base64_string) - image_array = np.frombuffer(image_bytes, dtype=np.uint8) - image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) - - if image is None: - return { - "status": "error", - "error": "Failed to decode base64 image. Invalid image format.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - except Exception as img_error: - return { - "status": "error", - "error": f"Error processing base64 image: {str(img_error)}", - "timing": {"total": (time.time() - start_time) * 1000}, - } - elif isinstance(base_64_image, np.ndarray): - image = base_64_image - else: - return { - "status": "error", - "error": f"Unsupported image format. Expected base64 string or numpy array, got {type(base_64_image)}", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - try: - # Setup and session management - setup_start = time.time() - - session = self.get_session(session_id) - - # Use provided refinement queue or get from session - if refinement_queue is None: - refinement_queue = session.get("refinement_queue") or {} - - timing_data["setup"] = (time.time() - setup_start) * 1000 - - # Localization - localization_start = time.time() - - # Ensure GPU components ready - self.ensure_gpu_components_ready() - - # Ensure maps are loaded - self.ensure_maps_loaded( - place, building, floor=floor, - enable_multifloor=enable_multifloor - ) - - # Get the selective localizer - map_key = (place, building) - localizer_to_use = self.selective_localizers.get(map_key) - if not localizer_to_use and floor: - floor_key = (place, building, floor) - localizer_to_use = self.selective_localizers.get(floor_key, self.localizer) - else: - localizer_to_use = localizer_to_use or self.localizer - - # Perform localization - output = localizer_to_use.localize(image, refinement_queue, top_k=top_k) - - timing_data["localization"] = (time.time() - localization_start) * 1000 - - # Check if localization succeeded - if output is None or "floorplan_pose" not in output: - return { - "status": "error", - "error": "Localization failed, no pose found.", - "error_code": "localization_failed", - "timing": timing_data, - } - - # Process localization results - processing_start = time.time() - - floorplan_pose = output["floorplan_pose"] - source_key = output["best_map_key"] - localized_place, localized_building, localized_floor = source_key - - # Update user's current floor context and pose - self.update_session( - session_id, - { - "current_place": localized_place, - "current_building": localized_building, - "current_floor": localized_floor, - "floorplan_pose": floorplan_pose, - "refinement_queue": output["refinement_queue"], - }, - ) - - timing_data["processing"] = (time.time() - processing_start) * 1000 - - # Serialization - serialization_start = time.time() - - serialized_source_key = self._safe_serialize(source_key) - serialized_floorplan_pose = self._safe_serialize(floorplan_pose) - - timing_data["serialization"] = (time.time() - serialization_start) * 1000 - - # Calculate total time - timing_data["total"] = (time.time() - start_time) * 1000 - - # Return localization results - return { - "status": "success", - "floorplan_pose": serialized_floorplan_pose, - "best_map_key": serialized_source_key, - "location_info": { - "place": localized_place, - "building": localized_building, - "floor": localized_floor, - "position": serialized_floorplan_pose.get("xy"), - "heading": serialized_floorplan_pose.get("ang"), - }, - "timing": timing_data, - } - - except Exception as e: - timing_data["total"] = (time.time() - start_time) * 1000 - - import traceback - traceback.print_exc() - - return { - "status": "error", - "error": str(e), - "type": type(e).__name__, - "timing": timing_data, - } - - def _safe_serialize(self, obj): - """Helper method to safely serialize objects for JSON response""" - import json - import numpy as np - - def convert_obj(o): - if isinstance(o, np.ndarray): - return o.tolist() - elif isinstance(o, np.integer): - return int(o) - elif isinstance(o, np.floating): - return float(o) - elif isinstance(o, dict): - return {k: convert_obj(v) for k, v in o.items()} - elif isinstance(o, (list, tuple)): - return [convert_obj(item) for item in o] - else: - return o - - return convert_obj(obj) - - @method() - def generate_nav_instructions_from_coordinates( - self, - session_id: str, - localization_result: dict, - dest_id: int, - target_place: str, - target_building: str, - target_floor: str, - unit: str = "meter", - language: str = "en", + enable_multifloor: bool = True, ): - """ - Generate navigation instructions from localized coordinates to a destination. - - This function takes the coordinates returned by localize_user and generates - step-by-step navigation instructions to reach the specified destination. - - Args: - session_id: Unique identifier for the user session - localization_result: Result dictionary from localize_user containing: - - floorplan_pose: Current position with xy coordinates and angle - - best_map_key: Tuple of (place, building, floor) - dest_id: Destination node ID in the navigation graph - target_place: Target place name - target_building: Target building name - target_floor: Target floor name - unit: Unit for distance measurements ("meter" or "foot") - language: Language code for instructions (e.g., "en", "es", "fr") - - Returns: - dict: Contains status, navigation instructions, path info, and timing - - status: "success" or "error" - - instructions: List of step-by-step navigation commands - - path_info: Details about the route - - timing: Performance metrics - """ - import time - - start_time = time.time() - timing_data = {} - - try: - # Step 1: Validate input - validation_start = time.time() - - if not localization_result or localization_result.get("status") != "success": - return { - "status": "error", - "error": "Invalid localization result. Please provide a successful localization_result from localize_user.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - floorplan_pose = localization_result.get("floorplan_pose") - best_map_key = localization_result.get("best_map_key") - - if not floorplan_pose or not best_map_key: - return { - "status": "error", - "error": "Missing floorplan_pose or best_map_key in localization result.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - # Extract start position - start_xy = floorplan_pose.get("xy") - start_heading = -floorplan_pose.get("ang", 0) # Negate angle for navigation - - if not start_xy or len(start_xy) != 2: - return { - "status": "error", - "error": "Invalid start position in floorplan_pose.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - # Extract start location - if isinstance(best_map_key, (list, tuple)) and len(best_map_key) >= 3: - start_place, start_building, start_floor = best_map_key[0], best_map_key[1], best_map_key[2] - else: - return { - "status": "error", - "error": "Invalid best_map_key format in localization result.", - "timing": {"total": (time.time() - start_time) * 1000}, - } - - timing_data["validation"] = (time.time() - validation_start) * 1000 - - # Step 2: Ensure GPU components and maps are ready - setup_start = time.time() - - self.ensure_gpu_components_ready() - self.ensure_maps_loaded( - target_place, target_building, floor=target_floor, - enable_multifloor=True - ) - - timing_data["setup"] = (time.time() - setup_start) * 1000 - - # Step 3: Path planning - path_planning_start = time.time() - - # Convert dest_id to int if necessary - try: - dest_id_for_path = int(dest_id) - except (ValueError, TypeError): - dest_id_for_path = dest_id - - # Plan navigation path to destination - result = self.nav.find_path( - start_place, - start_building, - start_floor, - start_xy, - target_place, - target_building, - target_floor, - dest_id_for_path, - ) - - timing_data["path_planning"] = (time.time() - path_planning_start) * 1000 - - if result is None: - return { - "status": "error", - "error": "Path planning failed. Could not find route to destination.", - "timing": timing_data, - } - - # Check if result contains an error - if isinstance(result, dict) and "error" in result: - return { - "status": "error", - "error": f"Path planning failed: {result['error']}", - "timing": timing_data, - } - - # Step 4: Generate navigation instructions - command_generation_start = time.time() - - # Generate spoken/navigation commands - cmds = self.commander( - self.nav, - result, - initial_heading=start_heading, - unit=unit, - language=language, - ) - - timing_data["command_generation"] = (time.time() - command_generation_start) * 1000 - - # Step 5: Serialize results - serialization_start = time.time() - - serialized_result = self._safe_serialize(result) - serialized_cmds = self._safe_serialize(cmds) - serialized_source_key = self._safe_serialize(best_map_key) - serialized_floorplan_pose = self._safe_serialize(floorplan_pose) - - timing_data["serialization"] = (time.time() - serialization_start) * 1000 - - # Calculate total time - timing_data["total"] = (time.time() - start_time) * 1000 - - # Step 6: Update session with navigation context - self.update_session( - session_id, - { - "current_place": start_place, - "current_building": start_building, - "current_floor": start_floor, - "target_place": target_place, - "target_building": target_building, - "target_floor": target_floor, - "selected_dest_id": dest_id, - }, - ) - - # Return navigation instructions in same format as planner - result_dict = { - "status": "success", - "result": serialized_result, - "cmds": serialized_cmds, - "best_map_key": serialized_source_key, - "floorplan_pose": serialized_floorplan_pose, - "navigation_info": { - "start_location": f"{start_place}/{start_building}/{start_floor}", - "destination": f"{target_place}/{target_building}/{target_floor}", - "dest_id": dest_id, - "unit": unit, - "language": language, - }, - "timing": timing_data, - } - - return self.convert_navigation_to_trajectory(result_dict) - - except Exception as e: - timing_data["total"] = (time.time() - start_time) * 1000 - - import traceback - traceback.print_exc() - - return { - "status": "error", - "error": str(e), - "type": type(e).__name__, - "timing": timing_data, - } - - @method() - def get_user_session(self, user_id: str): - """Get current user session data""" - try: - session = self.get_session(user_id) - return {"status": "success", "session": self._safe_serialize(session)} - except Exception as e: - return {"status": "error", "message": str(e), "type": type(e).__name__} - - @method() - def clear_user_session(self, user_id: str): - """Clear user session data""" - try: - if user_id in self.user_sessions: - del self.user_sessions[user_id] - return { - "status": "success", - "message": f"Session cleared for user {user_id}", - } - except Exception as e: - return {"status": "error", "message": str(e), "type": type(e).__name__} - - @method() - def unav_navigation_simple(self, inputs: dict): - """ - Simplified navigation interface that matches the original function signature. - - Args: - inputs (dict): { - "user_id": str, - "image": np.ndarray (BGR image), - "top_k": Optional[int] - } - - Returns: - dict: { - "result": dict (path info), - "cmds": list(str) (step-by-step instructions), - "best_map_key": tuple(str, str, str) (current floor), - "floorplan_pose": dict (current pose) - } - or dict with "error" key on failure. - """ - try: - user_id = inputs["user_id"] - image = inputs["image"] - top_k = inputs.get("top_k", None) - - session = self.get_session(user_id) - - # Check for required navigation context - dest_id = session.get("selected_dest_id") - target_place = session.get("target_place") - target_building = session.get("target_building") - target_floor = session.get("target_floor") - unit = session.get("unit", "meter") - user_lang = session.get("language", "en") - - if not all([dest_id, target_place, target_building, target_floor]): - return { - "error": "Incomplete navigation context. Please select a destination." - } - - # Call the main navigation method - result = self.planner( - user_id=user_id, - image=image, - dest_id=dest_id, - target_place=target_place, - target_building=target_building, - target_floor=target_floor, - top_k=top_k, - unit=unit, - language=user_lang, - refinement_queue=session.get("refinement_queue"), - ) - - # Return in the expected format - if result.get("status") == "success": - return { - "result": result["result"], - "cmds": result["cmds"], - "best_map_key": result["best_map_key"], - "floorplan_pose": result["floorplan_pose"], - } - else: - return {"error": result.get("error", "Navigation failed")} - - except Exception as e: - return {"error": str(e)} - - def convert_navigation_to_trajectory( - self, navigation_result: Dict[str, Any] - ) -> Dict[str, Any]: - """ - Convert navigation result format to trajectory output format. - - Args: - navigation_result: Dictionary containing navigation result data - - Returns: - Dictionary in trajectory output format - """ - - # Extract data from navigation result - result = navigation_result.get("result", {}) - cmds = navigation_result.get("cmds", []) - best_map_key = navigation_result.get("best_map_key", []) - floorplan_pose = navigation_result.get("floorplan_pose", {}) - navigation_info = navigation_result.get("navigation_info", {}) - - # Extract building, place, and floor information - place = best_map_key[0] if len(best_map_key) > 0 else "" - building = best_map_key[1] if len(best_map_key) > 1 else "" - floor = best_map_key[2] if len(best_map_key) > 2 else "" - - # Get path coordinates from result - path_coords = result.get("path_coords", []) - - # Add starting pose coordinates if available - start_xy = floorplan_pose.get("xy", []) - start_ang = floorplan_pose.get("ang", 0) - - # Create paths array - include start position with angle if available - paths = [] - if start_xy and len(start_xy) >= 2: - if start_ang: - paths.append([start_xy[0], start_xy[1], start_ang]) - else: - paths.append(start_xy) - - # Add all path coordinates - for coord in path_coords: - if len(coord) >= 2: - paths.append(coord) - - # Calculate scale based on total cost and path distance (approximate) - # This is an estimation - you may need to adjust based on your specific use case - total_cost = result.get("total_cost", 0) - scale = ( - 0.02205862195 # Default scale, you might want to calculate this dynamically + """Localize user position without navigation planning.""" + return run_localize_user( + self, + session_id=session_id, + base_64_image=base_64_image, + place=place, + building=building, + floor=floor, + top_k=top_k, + refinement_queue=refinement_queue, + enable_multifloor=enable_multifloor, ) - # Create trajectory structure - trajectory_data = { - "trajectory": [ - { - "0": { - "name": "destination", - "building": building, - "floor": floor, - "paths": paths, - "command": { - "instructions": cmds, - "are_instructions_generated": len(cmds) > 0, - }, - "scale": scale, - } - }, - None, # This seems to be a placeholder in the original format - ], - "scale": scale, - } - - return trajectory_data - - def run_vlm_on_image(self, image: np.ndarray) -> str: - """ - Run VLM on the provided image to extract text using Gemini 2.5 Flash. - - Args: - image (np.ndarray): BGR image array. - - Returns: - str: Extracted text from the image. - """ - # Create span for VLM extraction if tracer available - if hasattr(self, "tracer") and self.tracer: - with self.tracer.start_as_current_span( - "vlm_text_extraction_span" - ) as vlm_span: - try: - # 1) Import required libraries - from google import genai - from google.genai import types - import cv2 - import os - - # 2) Assign API key - get from environment variable (set by Modal Secret) - GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") - if not GEMINI_API_KEY: - error_msg = "GEMINI_API_KEY environment variable not set. Please set it with your API key in Modal Secrets." - print(f"❌ {error_msg}") - return error_msg - - # Create client with API key - client = genai.Client(api_key=GEMINI_API_KEY) - - # 3) Configure with Gemini 2.5 Flash model - GEMINI_MODEL = "gemini-2.5-flash" - - # 4) Run VLM on the image - # Convert BGR to RGB for proper processing - image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - # Encode image to bytes (JPEG format) - _, image_bytes = cv2.imencode(".jpg", image_rgb) - image_bytes = image_bytes.tobytes() - - # Create the prompt for text extraction - prompt = """Analyze this image and extract all visible text content. - Please provide: - 1. All readable text, signs, labels, and written content - 2. Any numbers, codes, or identifiers visible - 3. Location descriptions or directional information if present - - Format the response as clear, readable text without extra formatting.""" - - # Generate content using the model with proper SDK usage - response = client.models.generate_content( - model=GEMINI_MODEL, - contents=[ - prompt, - types.Part.from_bytes( - data=image_bytes, mime_type="image/jpeg" - ), - ], - ) - - # Extract the text response - extracted_text = ( - response.text if response.text else "No text extracted" - ) - - print( - f"✅ VLM extraction successful: {len(extracted_text)} characters extracted" - ) - - return extracted_text - - except ImportError as e: - error_msg = f"Missing required library for VLM: {str(e)}. Please install: pip install google-genai" - print(f"❌ {error_msg}") - return error_msg - except Exception as e: - error_msg = f"VLM extraction failed: {str(e)}" - print(f"❌ {error_msg}") - return error_msg - else: - try: - # 1) Import required libraries - from google import genai - from google.genai import types - import cv2 - import os - - # 2) Assign API key - get from environment variable (set by Modal Secret) - GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") - if not GEMINI_API_KEY: - error_msg = "GEMINI_API_KEY environment variable not set. Please set it with your API key in Modal Secrets." - print(f"❌ {error_msg}") - return error_msg - - # Create client with API key - client = genai.Client(api_key=GEMINI_API_KEY) - - # 3) Configure with Gemini 2.5 Flash model - GEMINI_MODEL = "gemini-2.5-flash" - - # 4) Run VLM on the image - # Convert BGR to RGB for proper processing - image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - # Encode image to bytes (JPEG format) - _, image_bytes = cv2.imencode(".jpg", image_rgb) - image_bytes = image_bytes.tobytes() - - # Create the prompt for text extraction - prompt = """Analyze this image and extract all visible text content. - Please provide: - 1. All readable text, signs, labels, and written content - 2. Any numbers, codes, or identifiers visible - 3. Location descriptions or directional information if present - - Format the response as clear, readable text without extra formatting.""" - - # Generate content using the model with proper SDK usage - response = client.models.generate_content( - model=GEMINI_MODEL, - contents=[ - prompt, - types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg"), - ], - ) - - # Extract the text response - extracted_text = response.text if response.text else "No text extracted" - - print( - f"✅ VLM extraction successful: {len(extracted_text)} characters extracted" - ) - - return extracted_text - except ImportError as e: - error_msg = f"Missing required library for VLM: {str(e)}. Please install: pip install google-genai" - print(f"❌ {error_msg}") - return error_msg - except Exception as e: - error_msg = f"VLM extraction failed: {str(e)}" - print(f"❌ {error_msg}") - return error_msg +# End of file