-
Notifications
You must be signed in to change notification settings - Fork 674
fix: add kube impl for discovery and add metadata endpoint #4136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: mohammedabdulwahhab <[email protected]>
Signed-off-by: mohammedabdulwahhab <[email protected]>
Signed-off-by: mohammedabdulwahhab <[email protected]>
WalkthroughThis pull request replaces etcd-based service discovery with a Kubernetes-native discovery client, introducing a new discovery architecture with support for both Kubernetes (via EndpointSlices) and KV store backends. It includes comprehensive Kubernetes testing infrastructure, public API updates for discovery interfaces, and integration across multiple service components. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~90 minutes Areas requiring extra attention:
Poem
Pre-merge checks❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
lib/runtime/src/component/endpoint.rs (1)
194-225: Keep the discovery registration alive
DiscoveryClient::registerhands back an RAII registration guard whoseDropremoves the entry. Because we don’t bind the returned guard, it’s dropped immediately, so nothing ever shows up in discovery even though we logged success. Hold the guard for the lifetime of the endpoint task (e.g., bind it to_discovery_guard) and only let it drop when we’re shutting down.- if let Err(e) = discovery_client.register(discovery_spec).await { - tracing::error!( - component_name, - endpoint_name, - error = %e, - "Unable to register service for discovery" - ); - endpoint_shutdown_token.cancel(); - return Err(error!( - "Unable to register service for discovery. Check discovery service status" - )); - } + let _discovery_guard = match discovery_client.register(discovery_spec).await { + Ok(guard) => guard, + Err(e) => { + tracing::error!( + component_name, + endpoint_name, + error = %e, + "Unable to register service for discovery" + ); + endpoint_shutdown_token.cancel(); + return Err(error!( + "Unable to register service for discovery. Check discovery service status" + )); + } + };lib/runtime/src/discovery/mock.rs (1)
124-166: Discovery mock never emits updates for modified instancesLine 145 currently only inserts the instance_id into
known_instances. Once an id is present, any later change to that instance’s payload is ignored because the HashSet can’t detect data mutations. The real discovery backends re-emit anAddedevent when the resource changes, so the mock now diverges: tests that rely on watching for config updates will never see them.Please track the full instance alongside its id and compare the payload before deciding whether to skip the event.
- use std::collections::HashSet; + use std::collections::HashMap; … - let mut known_instances = HashSet::new(); + let mut known_instances: HashMap<u64, DiscoveryInstance> = HashMap::new(); … - let current: Vec<_> = { + let current: Vec<_> = { let instances = registry.instances.lock().unwrap(); instances .iter() .filter(|instance| matches_key(instance, &key)) .cloned() .collect() }; - let current_ids: HashSet<_> = current.iter().map(|i| { - match i { - DiscoveryInstance::Endpoint(inst) => inst.instance_id, - DiscoveryInstance::ModelCard { instance_id, .. } => *instance_id, - } - }).collect(); + let current_map: HashMap<u64, DiscoveryInstance> = current + .iter() + .map(|instance| { + let id = match instance { + DiscoveryInstance::Endpoint(inst) => inst.instance_id, + DiscoveryInstance::ModelCard { instance_id, .. } => *instance_id, + }; + (id, instance.clone()) + }) + .collect(); // Emit Added events for new instances - for instance in current { - let id = match &instance { - DiscoveryInstance::Endpoint(inst) => inst.instance_id, - DiscoveryInstance::ModelCard { instance_id, .. } => *instance_id, - }; - if known_instances.insert(id) { - yield Ok(DiscoveryEvent::Added(instance)); - } + for (id, instance) in current_map.iter() { + match known_instances.get(id) { + Some(previous) if previous == instance => {} + _ => { + known_instances.insert(*id, instance.clone()); + yield Ok(DiscoveryEvent::Added(instance.clone())); + } + } } // Emit Removed events for instances that are gone - for id in known_instances.difference(¤t_ids).cloned().collect::<Vec<_>>() { - yield Ok(DiscoveryEvent::Removed(id)); - known_instances.remove(&id); + for id in known_instances + .keys() + .filter(|id| !current_map.contains_key(id)) + .copied() + .collect::<Vec<_>>() + { + known_instances.remove(&id); + yield Ok(DiscoveryEvent::Removed(id)); }lib/llm/src/discovery/watcher.rs (1)
181-209: Do not collapse model-card keys to instance_id-onlyA single pod can publish multiple
DiscoveryInstance::ModelCards (e.g., generate + prefill endpoints share the same hashedinstance_id). By switching the manager key to justformat!("{:x}", instance_id), every additional card from the same pod overwrites the previous entry. When that pod is later removed,handle_deleteonly sees the last card, so the earlier models remain registered forever and never get cleaned up. You can observe the multi-card emission inKubeDiscoveryClient::filter_metadata, which yields oneDiscoveryInstance::ModelCardper advertised model for the sameinstance_id. Please keep the key composite (namespace/component/endpoint + instance_id) like the legacy etcd path, e.g.:- let key = format!("{:x}", instance_id); + let key = format!( + "{}/{}/{}/{:x}", + endpoint_id.namespace, endpoint_id.component, endpoint_id.name, instance_id + );and mirror the same format in the delete path so removal evicts every model registered by that worker.
🧹 Nitpick comments (9)
.gitignore (1)
112-112: LGTM!The addition of
rebuild.shto.gitignoreis appropriate. If this script is generated by or specific to direnv workflows, consider optionally updating the section comment to reflect that it covers both direnv config files and related development artifacts.k8s-test/README.md (1)
21-43: Annotate fenced blocks with languages
markdownlintis flagging MD040 because these fences don’t declare a language. Tag them with something likebash (ortext where appropriate) so the docs lint cleanly.k8s-test/LOCAL_TESTING.md (2)
72-76: Add language identifier to fenced code block.The fenced code block starting at line 72 is missing a language identifier, which affects syntax highlighting and rendering.
Apply this diff to add the language identifier:
-``` +```text Local test mode: using localhost:8080 for pod dynamo-test-worker-8080 Fetching metadata from http://localhost:8080/metadataAs per static analysis hints --- `208-208`: **Consider adding comma for clarity.** The sentence could benefit from a comma after "helper script" for improved readability. ```diff -**Problem:** You created a pod without using the helper script and the name doesn't end with a port number. +**Problem:** You created a pod without using the helper script, and the name doesn't end with a port number.As per static analysis hints
lib/llm/src/kv_router/subscriber.rs (2)
307-324: Consider logging Added events for observability.The code silently ignores
DiscoveryEvent::Addedevents. While the subscriber only needs to handle removals for cleanup, logging Added events would improve observability and help with debugging discovery issues.Apply this diff to add logging for Added events:
Some(discovery_event_result) = instance_event_stream.next() => { let Ok(discovery_event) = discovery_event_result else { continue; }; - let dynamo_runtime::discovery::DiscoveryEvent::Removed(worker_id) = discovery_event else { + match discovery_event { + dynamo_runtime::discovery::DiscoveryEvent::Added(instance_id) => { + tracing::debug!( + instance_id = instance_id, + "DISCOVERY: Generate endpoint instance added" + ); + } + dynamo_runtime::discovery::DiscoveryEvent::Removed(worker_id) => { + tracing::warn!( + worker_id = worker_id, + "DISCOVERY: Generate endpoint instance removed, removing worker" + ); + + tracing::warn!("DISCOVERY_VALIDATION: remove_worker_tx: worker_id={}", worker_id); + if let Err(e) = remove_worker_tx.send(worker_id).await { + tracing::warn!("Failed to send worker removal for worker {worker_id}: {e}"); + } + } + } - continue; - }; - - tracing::warn!( - worker_id = worker_id, - "DISCOVERY: Generate endpoint instance removed, removing worker" - ); - - tracing::warn!("DISCOVERY_VALIDATION: remove_worker_tx: worker_id={}", worker_id); - if let Err(e) = remove_worker_tx.send(worker_id).await { - tracing::warn!("Failed to send worker removal for worker {worker_id}: {e}"); - } }
321-321: Remove or downgrade DISCOVERY_VALIDATION debug logging.The
DISCOVERY_VALIDATIONprefix suggests this is temporary debug/validation logging. Consider removing it or downgrading totracelevel once the discovery implementation is stable.- tracing::warn!("DISCOVERY_VALIDATION: remove_worker_tx: worker_id={}", worker_id); + tracing::trace!("Sending worker removal: worker_id={}", worker_id);k8s-test/run-tests.sh (1)
43-46: Consider adding cargo availability check.The script assumes
cargois available but doesn't validate this before attempting to run tests. Whileset -ewill catch the error, an explicit check with a helpful message would improve user experience.Add this check after the kubectl validation:
echo "✅ kubectl is configured" echo " Cluster: $(kubectl config current-context)" echo "" + +# Check if cargo is available +if ! command -v cargo &> /dev/null; then + echo "❌ cargo is not available" + echo " Please ensure Rust and cargo are installed" + exit 1 +fik8s-test/create-local-test-pod.sh (1)
44-45: Consider including port in SERVICE_NAME to avoid conflicts.The
SERVICE_NAMEis derived only fromDYNAMO_COMPONENT, which could cause conflicts if the script is run multiple times with the same component but different ports. The pod name includes the port for uniqueness, but the service name does not.Apply this diff to make the service name unique:
POD_NAME="dynamo-test-worker-${PORT}" -SERVICE_NAME="dynamo-test-${DYNAMO_COMPONENT}" +SERVICE_NAME="dynamo-test-${DYNAMO_COMPONENT}-${PORT}"This ensures each service is uniquely identified and prevents conflicts when running multiple local test servers for the same component on different ports.
lib/runtime/src/component/client.rs (1)
242-379: Release the cache lock before awaiting discoveryLine 257 keeps the
instance_sourcesmutex held while awaitinglist_and_watch. If that network call stalls (e.g. Kubernetes API lag), every other endpoint trying to resolve its instance source will block on the mutex. Please narrow the critical section to the cache check/insert and drop the guard before any awaits.- let instance_sources = drt.instance_sources(); - let mut instance_sources = instance_sources.lock().await; + let instance_sources = drt.instance_sources(); + let mut instance_sources_guard = instance_sources.lock().await; … - if let Some(instance_source) = instance_sources.get(endpoint) { + if let Some(instance_source) = instance_sources_guard.get(endpoint) { if let Some(instance_source) = instance_source.upgrade() { tracing::debug!( "get_or_create_dynamic_instance_source: Found cached instance source for endpoint: {}", endpoint.path() ); return Ok(instance_source); } else { tracing::debug!( "get_or_create_dynamic_instance_source: Cached instance source was dropped, removing for endpoint: {}", endpoint.path() ); - instance_sources.remove(endpoint); + instance_sources_guard.remove(endpoint); } } + drop(instance_sources_guard); + tracing::debug!( "get_or_create_dynamic_instance_source: Creating new instance source for endpoint: {}", endpoint.path() ); let discovery_client = drt.discovery_client(); … - instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source)); + let mut instance_sources_guard = instance_sources.lock().await; + instance_sources_guard.insert(endpoint.clone(), Arc::downgrade(&instance_source));
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locklib/bindings/python/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (38)
.gitignore(1 hunks)components/src/dynamo/vllm/main.py(1 hunks)k8s-test/LOCAL_TESTING.md(1 hunks)k8s-test/README.md(1 hunks)k8s-test/cleanup.sh(1 hunks)k8s-test/create-local-test-pod.sh(1 hunks)k8s-test/deploy.sh(1 hunks)k8s-test/manifests/test-deployment.yaml(1 hunks)k8s-test/run-tests.sh(1 hunks)lib/llm/src/discovery/watcher.rs(7 hunks)lib/llm/src/discovery/worker_monitor.rs(3 hunks)lib/llm/src/entrypoint/input/common.rs(2 hunks)lib/llm/src/entrypoint/input/grpc.rs(2 hunks)lib/llm/src/entrypoint/input/http.rs(2 hunks)lib/llm/src/http/service/clear_kv_blocks.rs(3 hunks)lib/llm/src/http/service/health.rs(1 hunks)lib/llm/src/http/service/service_v2.rs(4 hunks)lib/llm/src/kv_router.rs(3 hunks)lib/llm/src/kv_router/scheduler.rs(1 hunks)lib/llm/src/kv_router/subscriber.rs(3 hunks)lib/llm/src/local_model.rs(2 hunks)lib/llm/tests/http_metrics.rs(3 hunks)lib/runtime/Cargo.toml(2 hunks)lib/runtime/src/component.rs(2 hunks)lib/runtime/src/component/client.rs(4 hunks)lib/runtime/src/component/endpoint.rs(2 hunks)lib/runtime/src/discovery/kube.rs(1 hunks)lib/runtime/src/discovery/kv_store.rs(1 hunks)lib/runtime/src/discovery/mock.rs(6 hunks)lib/runtime/src/discovery/mod.rs(5 hunks)lib/runtime/src/discovery/utils.rs(1 hunks)lib/runtime/src/distributed.rs(5 hunks)lib/runtime/src/instances.rs(1 hunks)lib/runtime/src/lib.rs(1 hunks)lib/runtime/src/storage/key_value_store.rs(1 hunks)lib/runtime/src/system_status_server.rs(4 hunks)lib/runtime/tests/kube_client_integration.rs(1 hunks)lib/runtime/tests/kube_discovery_integration.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (20)
📚 Learning: 2025-11-05T08:41:06.483Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 4070
File: lib/discovery/src/systems/etcd/peer.rs:151-188
Timestamp: 2025-11-05T08:41:06.483Z
Learning: In lib/discovery/src/systems/etcd/peer.rs, the register_instance method intentionally captures the lease_id before entering the OperationExecutor closure. If the lease is revoked or fails, the operation should hard-fail rather than retry with a new lease, because the system does not track which entries were registered under which lease. Retrying with a fresh lease would create inconsistent state.
Applied to files:
lib/runtime/src/instances.rslib/runtime/src/discovery/kv_store.rslib/runtime/src/component/endpoint.rslib/llm/src/discovery/worker_monitor.rslib/llm/src/kv_router/subscriber.rslib/llm/src/discovery/watcher.rslib/runtime/src/discovery/mock.rs
📚 Learning: 2025-10-14T00:58:05.744Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3597
File: lib/llm/src/kv_router/indexer.rs:437-441
Timestamp: 2025-10-14T00:58:05.744Z
Learning: In lib/llm/src/kv_router/indexer.rs, when a KvCacheEventData::Cleared event is received, the system intentionally clears all dp_ranks for the given worker_id by calling clear_all_blocks(worker.worker_id). This is the desired behavior and should not be scoped to individual dp_ranks.
Applied to files:
lib/llm/src/http/service/clear_kv_blocks.rslib/llm/src/kv_router/subscriber.rs
📚 Learning: 2025-09-02T16:46:54.015Z
Learnt from: GuanLuo
Repo: ai-dynamo/dynamo PR: 2714
File: lib/llm/src/discovery/model_entry.rs:38-42
Timestamp: 2025-09-02T16:46:54.015Z
Learning: In lib/llm/src/discovery/model_entry.rs, GuanLuo prefers not to add serde defaults for model_type and model_input fields to keep the specification explicit and avoid user errors, relying on atomic deployment strategy to avoid backward compatibility issues.
Applied to files:
lib/llm/src/local_model.rslib/llm/src/kv_router.rslib/runtime/src/lib.rslib/runtime/src/component/endpoint.rslib/llm/src/entrypoint/input/common.rslib/llm/src/entrypoint/input/grpc.rslib/llm/tests/http_metrics.rslib/llm/src/discovery/worker_monitor.rslib/llm/src/http/service/service_v2.rslib/llm/src/discovery/watcher.rslib/runtime/src/discovery/mock.rslib/runtime/src/discovery/mod.rslib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-08-21T17:23:02.836Z
Learnt from: michaelfeil
Repo: ai-dynamo/dynamo PR: 2591
File: lib/bindings/python/rust/http.rs:0-0
Timestamp: 2025-08-21T17:23:02.836Z
Learning: In lib/bindings/python/rust/http.rs, the enable_endpoint method uses EndpointType::all() to dynamically support all available endpoint types with case-insensitive matching, which is more maintainable than hardcoded match statements for endpoint type mappings.
Applied to files:
lib/llm/src/local_model.rs
📚 Learning: 2025-10-16T13:35:33.710Z
Learnt from: grahamking
Repo: ai-dynamo/dynamo PR: 3659
File: lib/llm/src/common/checked_file.rs:113-124
Timestamp: 2025-10-16T13:35:33.710Z
Learning: In the dynamo project, model deployment cards stored in etcd are cleared by lease expiration, so there's no persistence of old card data across system restarts or upgrades.
Applied to files:
lib/llm/src/local_model.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, when using `kv_get_and_watch_prefix()` and `dissolve()`, the returned `events_rx` receiver maintains the etcd watch stream independently. The watcher handle can be safely dropped (using `_watcher`) without terminating the stream, as the receiver keeps the connection alive internally. This is a consistent pattern used throughout the codebase in multiple critical modules.
Applied to files:
lib/llm/src/kv_router.rslib/runtime/src/storage/key_value_store.rslib/llm/src/entrypoint/input/grpc.rslib/llm/src/kv_router/subscriber.rslib/llm/src/discovery/watcher.rslib/runtime/src/component/client.rslib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, `PrefixWatcher` uses `#[derive(Dissolve)]` to generate a `dissolve()` method. The pattern `let (_, _watcher, mut events_rx) = prefix_watcher.dissolve();` is the standard and intended usage throughout the codebase. The `mpsc::Receiver<WatchEvent>` maintains the etcd watch stream independently, so the `Watcher` handle can be safely dropped. This pattern is used consistently in critical infrastructure modules like component/client.rs, utils/leader_worker_barrier.rs, and entrypoint/input/http.rs.
Applied to files:
lib/llm/src/kv_router.rslib/runtime/src/discovery/utils.rslib/runtime/src/storage/key_value_store.rslib/llm/src/entrypoint/input/common.rslib/llm/src/entrypoint/input/grpc.rslib/llm/tests/http_metrics.rslib/llm/src/discovery/worker_monitor.rslib/llm/src/kv_router/subscriber.rslib/llm/src/discovery/watcher.rslib/runtime/src/component/client.rslib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-08-15T23:51:04.958Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, when using `kv_get_and_watch_prefix()` and `dissolve()`, the returned `events_rx` receiver maintains the etcd watch stream independently. The watcher handle can be safely dropped (using `_watcher`) without terminating the stream, as the receiver keeps the connection alive internally.
Applied to files:
lib/llm/src/kv_router.rslib/runtime/src/storage/key_value_store.rslib/llm/src/entrypoint/input/grpc.rslib/llm/src/kv_router/subscriber.rslib/llm/src/discovery/watcher.rslib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.
Applied to files:
lib/llm/src/kv_router.rslib/llm/src/kv_router/subscriber.rslib/llm/src/discovery/watcher.rslib/runtime/src/component/client.rs
📚 Learning: 2025-06-16T20:02:54.935Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1236
File: lib/llm/src/mocker/protocols.rs:85-112
Timestamp: 2025-06-16T20:02:54.935Z
Learning: When using derive_builder::Builder macro, the macro generates the builder struct and its methods, but does NOT generate a `builder()` method on the original struct. A manual `impl StructName { pub fn builder() -> StructNameBuilder { StructNameBuilder::default() } }` is required to provide the convenient `StructName::builder()` API pattern.
Applied to files:
lib/llm/src/kv_router.rs
📚 Learning: 2025-06-02T19:37:27.666Z
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
Applied to files:
lib/runtime/src/storage/key_value_store.rslib/llm/src/discovery/watcher.rslib/llm/src/entrypoint/input/http.rs
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.
Applied to files:
lib/llm/tests/http_metrics.rs
📚 Learning: 2025-05-30T06:38:09.630Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1285
File: lib/llm/src/kv_router/scoring.rs:58-63
Timestamp: 2025-05-30T06:38:09.630Z
Learning: In lib/llm/src/kv_router/scoring.rs, the user prefers to keep the panic behavior when calculating load_avg and variance with empty endpoints rather than adding guards for division by zero. They want the code to fail fast on this error condition.
Applied to files:
lib/llm/src/discovery/worker_monitor.rs
📚 Learning: 2025-06-05T01:02:15.318Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.
Applied to files:
lib/llm/src/discovery/worker_monitor.rs
📚 Learning: 2025-06-06T21:48:35.214Z
Learnt from: biswapanda
Repo: ai-dynamo/dynamo PR: 1412
File: lib/bindings/python/src/dynamo/runtime/logging.py:100-100
Timestamp: 2025-06-06T21:48:35.214Z
Learning: In the Dynamo codebase, BentoML has been completely removed from all executable code, with only documentation and attribution references remaining. The error_loggers configuration in lib/bindings/python/src/dynamo/runtime/logging.py should not include "bentoml" since those modules no longer exist.
Applied to files:
components/src/dynamo/vllm/main.py
📚 Learning: 2025-09-17T20:55:06.333Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3095
File: lib/llm/src/kv_router/indexer.rs:0-0
Timestamp: 2025-09-17T20:55:06.333Z
Learning: When PeaBrane encounters a complex implementation issue that would significantly expand PR scope (like the remove_worker_sender method in lib/llm/src/kv_router/indexer.rs that required thread-safe map updates and proper shard targeting), they prefer to remove the problematic implementation entirely rather than rush a partial fix, deferring the proper solution to a future PR.
Applied to files:
lib/llm/src/kv_router/subscriber.rs
📚 Learning: 2025-09-21T01:40:52.456Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3155
File: components/backends/vllm/src/dynamo/vllm/main.py:228-233
Timestamp: 2025-09-21T01:40:52.456Z
Learning: In the dynamo codebase, error handling for distributed runtime client initialization (like runtime.namespace().component().endpoint().client()) is handled at the Rust level in the distributed runtime bindings, so Python-level try/catch blocks are not needed and would be redundant.
Applied to files:
lib/llm/src/kv_router/subscriber.rs
📚 Learning: 2025-07-16T12:41:12.543Z
Learnt from: grahamking
Repo: ai-dynamo/dynamo PR: 1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.
Applied to files:
lib/llm/src/kv_router/subscriber.rslib/runtime/src/component/client.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane suggested using tokio::select! arm ordering with the existing biased directive in the indexer to create a natural barrier for dump requests, ensuring KV events are drained before snapshotting. This approach leverages existing architecture (biased select) to solve race conditions with minimal code changes, which aligns with their preference for contained solutions.
Applied to files:
lib/runtime/src/component/client.rs
📚 Learning: 2025-10-07T20:32:27.578Z
Learnt from: keivenchang
Repo: ai-dynamo/dynamo PR: 3266
File: lib/llm/src/http/service/metrics.rs:833-854
Timestamp: 2025-10-07T20:32:27.578Z
Learning: In Axum 0.6+, routers with different state types can be merged successfully using .merge(). Each router maintains its own state internally, so a Router<Arc<MetricsHandlerState>> can be merged with a Router<Arc<service_v2::State>> without compilation issues.
Applied to files:
lib/runtime/src/system_status_server.rs
🧬 Code graph analysis (21)
lib/runtime/src/instances.rs (3)
lib/runtime/src/distributed.rs (1)
discovery_client(295-297)lib/llm/src/http/service/service_v2.rs (1)
discovery_client(118-120)lib/runtime/src/component/client.rs (1)
instances(96-101)
lib/llm/src/http/service/health.rs (1)
lib/runtime/src/instances.rs (1)
list_all_instances(15-42)
lib/llm/src/http/service/clear_kv_blocks.rs (2)
lib/runtime/src/distributed.rs (1)
discovery_client(295-297)lib/llm/src/http/service/service_v2.rs (1)
discovery_client(118-120)
lib/llm/src/local_model.rs (3)
lib/llm/src/model_card.rs (2)
slug(272-274)name(267-269)lib/runtime/src/component.rs (8)
endpoint(270-278)component(513-515)component(667-673)namespace(258-260)namespace(676-683)name(262-264)name(509-511)name(689-694)lib/runtime/src/discovery/mod.rs (1)
from_model_card(83-99)
lib/llm/src/kv_router.rs (1)
lib/runtime/src/discovery/utils.rs (1)
watch_and_extract_field(41-106)
lib/runtime/src/discovery/utils.rs (2)
lib/runtime/src/discovery/mock.rs (3)
new(19-21)new(32-43)instance_id(94-96)lib/runtime/src/discovery/mod.rs (2)
instance_id(152-157)instance_id(192-192)
lib/runtime/src/discovery/kv_store.rs (2)
lib/runtime/src/storage/key_value_store.rs (10)
new(33-35)new(74-76)new(190-192)new(503-516)key(78-80)key_str(82-84)value(86-88)from_raw(38-40)entries(431-431)memory(182-184)lib/runtime/src/discovery/mod.rs (5)
instance_id(152-157)instance_id(192-192)register(195-195)list(199-199)list_and_watch(202-202)
lib/runtime/src/storage/key_value_store.rs (2)
lib/runtime/src/storage/key_value_store/mem.rs (2)
new(52-56)new(60-70)lib/runtime/src/transports/etcd.rs (2)
new(63-106)new(489-532)
lib/runtime/src/component/endpoint.rs (2)
lib/runtime/src/distributed.rs (2)
discovery_client(295-297)connection_id(280-282)lib/runtime/src/component.rs (2)
endpoint(270-278)subject(580-582)
lib/runtime/src/distributed.rs (3)
lib/llm/src/http/service/service_v2.rs (1)
discovery_client(118-120)lib/runtime/src/system_status_server.rs (3)
discovery_metadata(81-85)new(26-31)new(65-73)lib/runtime/src/discovery/kube.rs (2)
new(46-51)new(173-206)
lib/llm/src/discovery/worker_monitor.rs (3)
lib/bindings/python/src/dynamo/_core.pyi (1)
ModelDeploymentCard(433-438)lib/runtime/src/discovery/utils.rs (1)
watch_and_extract_field(41-106)lib/llm/src/local_model.rs (1)
card(329-331)
lib/runtime/tests/kube_client_integration.rs (1)
lib/runtime/src/discovery/kube.rs (2)
new_for_testing(214-245)instance_id(615-617)
lib/llm/src/http/service/service_v2.rs (2)
lib/runtime/src/distributed.rs (3)
discovery_client(295-297)new(50-259)store(336-338)lib/bindings/python/src/dynamo/_core.pyi (1)
CancellationToken(67-78)
lib/runtime/src/discovery/kube.rs (3)
lib/runtime/src/distributed.rs (5)
runtime(272-274)new(50-259)from_settings(261-264)from_settings(361-367)store(336-338)lib/runtime/src/system_status_server.rs (3)
new(26-31)new(65-73)port(41-43)lib/runtime/src/discovery/mod.rs (5)
instance_id(152-157)instance_id(192-192)register(195-195)list(199-199)list_and_watch(202-202)
lib/runtime/src/component.rs (2)
lib/runtime/src/distributed.rs (1)
discovery_client(295-297)lib/runtime/src/component/client.rs (1)
instances(96-101)
lib/runtime/tests/kube_discovery_integration.rs (1)
lib/runtime/src/discovery/kube.rs (4)
default(102-104)new(46-51)new(173-206)instance_id(615-617)
lib/llm/src/kv_router/subscriber.rs (3)
lib/runtime/src/distributed.rs (1)
discovery_client(295-297)lib/llm/src/http/service/service_v2.rs (1)
discovery_client(118-120)lib/llm/src/kv_router/scoring.rs (1)
worker_id(26-37)
lib/runtime/src/component/client.rs (3)
lib/runtime/src/component.rs (9)
path(249-251)path(518-525)client(593-599)new(658-664)drt(203-205)drt(467-469)drt(630-632)component(513-515)component(667-673)lib/runtime/src/distributed.rs (3)
new(50-259)instance_sources(348-350)discovery_client(295-297)lib/runtime/src/runtime.rs (1)
secondary(248-250)
lib/runtime/src/discovery/mock.rs (1)
lib/runtime/src/discovery/mod.rs (3)
list(199-199)instance_id(152-157)instance_id(192-192)
lib/runtime/src/discovery/mod.rs (4)
lib/runtime/src/discovery/kube.rs (4)
hash_pod_name(21-26)instance_id(615-617)list(662-742)list_and_watch(744-956)lib/runtime/src/discovery/utils.rs (1)
watch_and_extract_field(41-106)lib/runtime/src/discovery/kv_store.rs (3)
instance_id(80-82)list(163-193)list_and_watch(195-337)lib/runtime/src/discovery/mock.rs (3)
instance_id(94-96)list(110-117)list_and_watch(119-166)
lib/runtime/src/system_status_server.rs (2)
lib/runtime/src/discovery/kube.rs (2)
new(46-51)new(173-206)lib/runtime/src/distributed.rs (1)
new(50-259)
🪛 Checkov (3.2.334)
k8s-test/manifests/test-deployment.yaml
[medium] 3-45: Containers should not run with allowPrivilegeEscalation
(CKV_K8S_20)
[medium] 3-45: Minimize the admission of root containers
(CKV_K8S_23)
🪛 LanguageTool
k8s-test/LOCAL_TESTING.md
[uncategorized] ~208-~208: Possible missing comma found.
Context: ... created a pod without using the helper script and the name doesn't end with a port nu...
(AI_HYDRA_LEO_MISSING_COMMA)
k8s-test/README.md
[uncategorized] ~193-~193: Possible missing comma found.
Context: ..._endpointsTest thelist_and_watch()` method which creates a streaming watch. **Wha...
(AI_HYDRA_LEO_MISSING_COMMA)
🪛 markdownlint-cli2 (0.18.1)
k8s-test/LOCAL_TESTING.md
72-72: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
k8s-test/README.md
160-160: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
174-174: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
201-201: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
232-232: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
248-248: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
268-268: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
285-285: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
304-304: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
326-326: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
344-344: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (3)
k8s-test/run-tests.sh (1)
1-4: LGTM!The script header with strict error handling is appropriate for this test orchestration script.
k8s-test/create-local-test-pod.sh (2)
62-109: LGTM! Well-structured Kubernetes manifests.The Pod and Service manifests are correctly structured with appropriate labels, readiness probes, and networking configuration for local testing.
123-157: Clear and comprehensive usage instructions.The instructions provide excellent guidance for users setting up local testing, including all necessary environment variables and cleanup commands.
|
|
||
| import uvloop | ||
| from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints | ||
| # from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Commented import causes runtime error when kvbm connector is enabled.
The import of get_consolidator_endpoints is commented out (Line 11), but the function is still invoked at line 219. This will raise a NameError at runtime if config.has_connector("kvbm") returns True.
Resolution options:
- If kvbm support is still needed, uncomment the import:
-# from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints
+from kvbm.vllm_integration.consolidator_config import get_consolidator_endpoints- If kvbm is being deprecated as part of the Kubernetes migration, remove the dead code:
# Set up consolidator endpoints if KVBM is enabled
consolidator_endpoints = None
- if config.has_connector("kvbm"):
- consolidator_endpoints = get_consolidator_endpoints(vllm_config)
- vllm_config.consolidator_endpoints = consolidator_endpoints
+ vllm_config.consolidator_endpoints = NoneAlso applies to: 216-220
🤖 Prompt for AI Agents
In components/src/dynamo/vllm/main.py around lines 11 and 216-220, the import of
get_consolidator_endpoints is commented out but the function is invoked later
when config.has_connector("kvbm") is true, causing a NameError; fix by either
uncommenting and restoring the import at line 11 if kvbm support is required, or
remove the kvbm-specific invocation and related dead code (lines ~216-220) if
kvbm is deprecated, and ensure config.has_connector("kvbm") branches are
consistent with the chosen approach.
| selector: | ||
| app: dynamo-local-test | ||
| dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE" | ||
| dynamo.nvidia.com/component: "$DYNAMO_COMPONENT" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Service selector may be too restrictive for multiple pods.
The service selector matches on app, dynamo.nvidia.com/namespace, and dynamo.nvidia.com/component. If you create multiple pods for the same component (e.g., multiple backend instances on different ports), they would all be selected by the same service. This conflicts with the local testing model where each pod should route to a different localhost port.
Given the recommendation to change SERVICE_NAME to include the port, you should also update the selector to be more specific. Consider adding the local-test-port label to the selector:
selector:
app: dynamo-local-test
+ local-test-port: "$PORT"
dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE"
dynamo.nvidia.com/component: "$DYNAMO_COMPONENT"This ensures each service only routes to its corresponding pod.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| selector: | |
| app: dynamo-local-test | |
| dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE" | |
| dynamo.nvidia.com/component: "$DYNAMO_COMPONENT" | |
| selector: | |
| app: dynamo-local-test | |
| local-test-port: "$PORT" | |
| dynamo.nvidia.com/namespace: "$DYNAMO_NAMESPACE" | |
| dynamo.nvidia.com/component: "$DYNAMO_COMPONENT" |
🤖 Prompt for AI Agents
In k8s-test/create-local-test-pod.sh around lines 99 to 102, the Service
selector is too broad (matching app, dynamo.nvidia.com/namespace, and
dynamo.nvidia.com/component) which will select multiple pods and break per-port
local testing; update the Service selector to include the per-instance label
(e.g., local-test-port: "<port>") so the Service only targets the pod for that
port, and ensure the pod manifest/labels and the script that sets
SERVICE_NAME/port add the matching local-test-port label/value.
| echo "🔍 Checking for test resources in namespace: $NAMESPACE" | ||
|
|
||
| # Check if test resources are deployed | ||
| PODS=$(kubectl get pods -l app=dynamo-test --namespace="$NAMESPACE" --no-headers 2>/dev/null | wc -l) | ||
| if [ "$PODS" -eq 0 ]; then | ||
| echo "⚠️ Test resources not deployed in namespace: $NAMESPACE" | ||
| echo " Run ./deploy.sh $NAMESPACE to create test resources" | ||
| echo " (Tests will still run but may not find any endpoints)" | ||
| echo "" | ||
| else | ||
| echo "✅ Found $PODS test pods in namespace: $NAMESPACE" | ||
| echo "" | ||
| fi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify that tests can discover resources despite namespace mismatch.
The script checks for resources in the provided NAMESPACE (default: "default") but notes on line 91 that "the actual KubeDiscoveryClient namespace is determined by POD_NAMESPACE env var in test code." This could lead to confusion if a user provides a namespace argument but the tests look elsewhere.
Consider one of these approaches:
- Pass the NAMESPACE argument to tests via POD_NAMESPACE environment variable
- Remove the NAMESPACE argument and document that POD_NAMESPACE must be set
- Make the warning more prominent when namespace doesn't match
🏁 Script executed:
#!/bin/bash
# Verify if the test code respects a POD_NAMESPACE environment variable
# or if it has a hardcoded namespace
rg -n "POD_NAMESPACE" lib/runtime/tests/Length of output: 0
I'll conduct broader searches to verify the review concern more thoroughly.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find the run-tests.sh file and examine its content
fd -name "run-tests.sh" -type f
Length of output: 230
🏁 Script executed:
#!/bin/bash
# Search for POD_NAMESPACE references across the entire codebase
rg -n "POD_NAMESPACE" --type-list | head -20Length of output: 389
🏁 Script executed:
#!/bin/bash
# Search for POD_NAMESPACE usage without path restrictions
rg -n "POD_NAMESPACE" -iLength of output: 2757
🏁 Script executed:
#!/bin/bash
# Look for namespace-related environment variables in test or runtime code
rg -n "NAMESPACE" --type=sh -C3 | head -50Length of output: 2590
I need to see how tests are actually executed in the script to complete verification.
---
🏁 Script executed:
```shell
#!/bin/bash
# Examine the full run-tests.sh to understand test execution
cat k8s-test/run-tests.sh
Length of output: 3497
Pass NAMESPACE argument to tests via POD_NAMESPACE environment variable.
The NAMESPACE argument (line 23) is only used to check resource existence; it's never passed to the actual test execution. Tests read POD_NAMESPACE from the environment (lib/runtime/src/discovery/kube.rs:128), which defaults to "default" if unset. This creates a mismatch: a user providing ./run-tests.sh client "" my-namespace still runs tests against POD_NAMESPACE (not my-namespace).
Add before each cargo test invocation:
export POD_NAMESPACE="$NAMESPACE"This aligns test behavior with the user's namespace argument and eliminates confusion. The warning on line 91 can then be removed since the script guarantees namespace consistency.
🤖 Prompt for AI Agents
In k8s-test/run-tests.sh around lines 25 to 37, the script only uses the
NAMESPACE argument to check for deployed pods but never exports it for the
tests; this causes tests to use POD_NAMESPACE's default instead of the provided
NAMESPACE. Fix by exporting POD_NAMESPACE="$NAMESPACE" before any cargo test
invocations (add an export line before each cargo test call or once globally
before running tests) so the test process reads the intended namespace; remove
the now-unnecessary warning about namespace mismatch.
| let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances | ||
| .into_iter() | ||
| .filter(|instance| instance.endpoint == CLEAR_KV_ENDPOINT) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| if instances_filtered.is_empty() { | ||
| let found_endpoints: Vec<String> = instances | ||
| .iter() | ||
| .map(|instance| instance.endpoint.clone()) | ||
| .collect(); | ||
| add_worker_result( | ||
| false, | ||
| entry_name, | ||
| &format!( | ||
| "Worker group doesn't support clear_kv_blocks. Supported endpoints: {}", | ||
| found_endpoints.join(", ") | ||
| ), | ||
| namespace, | ||
| component, | ||
| None, | ||
| ); | ||
| continue; | ||
| } | ||
| .map(|di| match di { | ||
| dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => instance, | ||
| }) | ||
| .collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle all DiscoveryInstance variants to prevent panic.
The pattern match only handles DiscoveryInstance::Endpoint but doesn't account for other possible variants (such as ModelCard). If discovery_client.list() returns a non-Endpoint variant, this code will panic.
Apply this diff to handle all variants gracefully:
let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances
.into_iter()
- .map(|di| match di {
- dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => instance,
- })
+ .filter_map(|di| match di {
+ dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance),
+ _ => {
+ tracing::warn!("Unexpected discovery instance variant for clear_kv_blocks endpoint");
+ None
+ }
+ })
.collect();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances | |
| .into_iter() | |
| .filter(|instance| instance.endpoint == CLEAR_KV_ENDPOINT) | |
| .collect::<Vec<_>>(); | |
| if instances_filtered.is_empty() { | |
| let found_endpoints: Vec<String> = instances | |
| .iter() | |
| .map(|instance| instance.endpoint.clone()) | |
| .collect(); | |
| add_worker_result( | |
| false, | |
| entry_name, | |
| &format!( | |
| "Worker group doesn't support clear_kv_blocks. Supported endpoints: {}", | |
| found_endpoints.join(", ") | |
| ), | |
| namespace, | |
| component, | |
| None, | |
| ); | |
| continue; | |
| } | |
| .map(|di| match di { | |
| dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => instance, | |
| }) | |
| .collect(); | |
| let instances_filtered: Vec<dynamo_runtime::component::Instance> = discovery_instances | |
| .into_iter() | |
| .filter_map(|di| match di { | |
| dynamo_runtime::discovery::DiscoveryInstance::Endpoint(instance) => Some(instance), | |
| _ => { | |
| tracing::warn!("Unexpected discovery instance variant for clear_kv_blocks endpoint"); | |
| None | |
| } | |
| }) | |
| .collect(); |
🤖 Prompt for AI Agents
In lib/llm/src/http/service/clear_kv_blocks.rs around lines 187 to 192, the
current map only handles DiscoveryInstance::Endpoint and will panic on other
variants; change the logic to handle all variants by using a filter_map (or
match that returns Option) that extracts the Endpoint instance and skips
non-Endpoint variants (optionally logging or counting skipped variants) so the
code never panics if discovery_client.list() returns ModelCard or other
variants.
| // Initialize discovery client backed by KV store | ||
| // Create a cancellation token for the discovery client's watch streams | ||
| let discovery_client = { | ||
| let cancel_token = CancellationToken::new(); | ||
| Arc::new(KVStoreDiscoveryClient::new( | ||
| store.clone(), | ||
| cancel_token, | ||
| )) as Arc<dyn DiscoveryClient> | ||
| }; | ||
|
|
||
| Self { | ||
| manager, | ||
| metrics: Arc::new(Metrics::default()), | ||
| store, | ||
| discovery_client, | ||
| flags: StateFlags { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't hardcode a KV discovery client here
By constructing a KVStoreDiscoveryClient unconditionally, this service ignores the Kubernetes discovery backend that the rest of the runtime now supports. When DYN_DISCOVERY_BACKEND=kubernetes (the main scenario this PR enables), the KV store remains empty, so the HTTP service never sees any endpoints or model cards—client-facing APIs will report an empty model list even though pods are registered. Please source the discovery client from the same backend selection logic as DistributedRuntime (e.g., accept an Arc<dyn DiscoveryClient> from the builder or share the runtime’s already-initialized client) instead of hardcoding the KV implementation.
🤖 Prompt for AI Agents
In lib/llm/src/http/service/service_v2.rs around lines 77 to 92, the code
unconditionally constructs a KVStoreDiscoveryClient which bypasses the
Kubernetes discovery backend; instead of hardcoding KV, accept or reuse an
Arc<dyn DiscoveryClient> from the builder or from the already-initialized
DistributedRuntime and assign that to discovery_client. Modify the service
constructor to take an Arc<dyn DiscoveryClient> parameter (or a reference to the
runtime that exposes it), remove the KVStoreDiscoveryClient::new() call here,
and wire the provided discovery client into Self so the HTTP service respects
DYN_DISCOVERY_BACKEND selection. Ensure cancellation token handling remains
compatible with the provided client (or move token creation to the provider) and
update callers/tests to pass the appropriate DiscoveryClient.
| let spec = DiscoverySpec::from_model_card( | ||
| endpoint.component().namespace().name().to_string(), | ||
| endpoint.component().name().to_string(), | ||
| endpoint.name().to_string(), | ||
| &self.card, | ||
| )?; | ||
| let _instance = discovery.register(spec).await?; | ||
|
|
||
| tracing::warn!( | ||
| "DISCOVERY_VALIDATION: model_card_registered: namespace={}, component={}, endpoint={}, model_name={}", | ||
| endpoint.component().namespace().name(), | ||
| endpoint.component().name(), | ||
| endpoint.name(), | ||
| self.card.name() | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Registration handle is dropped immediately
DiscoveryClient::register returns a handle (DiscoveryInstance) whose Drop un-registers the spec. Assigning it to _instance and letting it go out of scope means the model is unregistered as soon as attach returns, so ingress will never see it. Please retain the handle for as long as the model should stay registered (e.g., store it on LocalModel and release it on detach/shutdown).
🤖 Prompt for AI Agents
In lib/llm/src/local_model.rs around lines 418 to 432, the DiscoveryInstance
returned by discovery.register is assigned to a temporary _instance and
immediately dropped, which causes the registration to be removed; retain the
handle on the LocalModel so the registration stays active by adding a field
(e.g., discovery_instance: Option<DiscoveryInstance>) to the LocalModel struct,
store the returned handle in that field in attach, and clear/drop it explicitly
in detach/shutdown to unregister when the model should actually be removed.
| // Fast path: check cache | ||
| { | ||
| let cache = self.metadata_cache.read().await; | ||
| if let Some(cached) = cache.get(&instance_id) { | ||
| tracing::debug!( | ||
| "Cache hit for pod_name={}, instance_id={:x}", | ||
| pod_name, | ||
| instance_id | ||
| ); | ||
| return Ok(cached.metadata.clone()); | ||
| } | ||
| } | ||
|
|
||
| // Cache miss: fetch from remote pod | ||
| tracing::debug!( | ||
| "Cache miss for pod_name={}, instance_id={:x}, fetching from {}", | ||
| pod_name, | ||
| instance_id, | ||
| target_host | ||
| ); | ||
| self.fetch_and_cache_from_host(instance_id, pod_name, &target_host).await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache must refresh to avoid permanently stale metadata.
get_metadata returns early on any cache hit, and the cache is only cleared when a pod disappears. If a pod adds or updates endpoints/model cards while staying ready, every other node will keep serving the old metadata forever (the fetched_at timestamp is never consulted). list and list_and_watch both rely on this path, so discovery data goes stale cluster-wide. Please add an expiry/version check (e.g., TTL based on fetched_at, ETag, or ResourceVersion) or always refetch when the EndpointSlice reports a change so metadata stays current.
| // Extract ALL current instances from ALL slices | ||
| let current_instances: HashSet<u64> = all_slices.iter() | ||
| .flat_map(Self::extract_instance_ids) | ||
| .collect(); | ||
|
|
||
| // Build endpoint info map for fetching | ||
| let mut endpoint_info_map = HashMap::new(); | ||
| for slice in &all_slices { | ||
| let endpoint_infos = Self::extract_endpoint_info(slice); | ||
| for (instance_id, pod_name, pod_ip) in endpoint_infos { | ||
| endpoint_info_map.entry(instance_id) | ||
| .or_insert((pod_name, pod_ip)); | ||
| } | ||
| } | ||
|
|
||
| // Diff against previous state | ||
| let prev_instances = known_instances.read().await.clone(); | ||
| let added: Vec<_> = current_instances.difference(&prev_instances).copied().collect(); | ||
| let removed: Vec<_> = prev_instances.difference(¤t_instances).copied().collect(); | ||
|
|
||
| if !added.is_empty() || !removed.is_empty() { | ||
| tracing::debug!( | ||
| stream_id = %stream_id, | ||
| added = added.len(), | ||
| removed = removed.len(), | ||
| total = current_instances.len(), | ||
| "State diff computed" | ||
| ); | ||
| } | ||
|
|
||
| // Update known_instances before fetching | ||
| *known_instances.write().await = current_instances.clone(); | ||
|
|
||
| // Fetch metadata for new instances concurrently | ||
| let fetch_futures: Vec<_> = added.iter().filter_map(|&instance_id| { | ||
| endpoint_info_map.get(&instance_id).map(|(pod_name, pod_ip)| { | ||
| let client = client.clone(); | ||
| let pod_name = pod_name.clone(); | ||
| let pod_ip = pod_ip.clone(); | ||
| let key_clone = key_clone.clone(); | ||
| let known_instances = known_instances.clone(); | ||
|
|
||
| async move { | ||
| match client.get_metadata(&pod_name, &pod_ip).await { | ||
| Ok(metadata) => { | ||
| // Fetch-after-delete guard: check if still in known set | ||
| if known_instances.read().await.contains(&instance_id) { | ||
| let instances = Self::filter_metadata(&metadata, &key_clone, instance_id); | ||
| Some((instance_id, instances)) | ||
| } else { | ||
| tracing::debug!( | ||
| stream_id = %stream_id, | ||
| instance_id = format!("{:x}", instance_id), | ||
| "Instance removed before fetch completed, skipping" | ||
| ); | ||
| None | ||
| } | ||
| } | ||
| Err(e) => { | ||
| tracing::warn!( | ||
| stream_id = %stream_id, | ||
| pod_name = %pod_name, | ||
| error = %e, | ||
| "Failed to fetch metadata" | ||
| ); | ||
| None | ||
| } | ||
| } | ||
| } | ||
| }) | ||
| }).collect(); | ||
|
|
||
| // Fetch concurrently and emit Added events | ||
| let results: Vec<_> = futures::stream::iter(fetch_futures) | ||
| .buffer_unordered(20) | ||
| .collect() | ||
| .await; | ||
|
|
||
| for result in results { | ||
| if let Some((_instance_id, instances)) = result { | ||
| for instance in instances { | ||
| tracing::info!( | ||
| stream_id = %stream_id, | ||
| instance_id = format!("{:x}", instance.instance_id()), | ||
| "Emitting Added event" | ||
| ); | ||
| if tx.send(Ok(DiscoveryEvent::Added(instance))).is_err() { | ||
| tracing::debug!(stream_id = %stream_id, "Receiver dropped, stopping monitor"); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Emit Removed events | ||
| for instance_id in removed { | ||
| tracing::info!( | ||
| stream_id = %stream_id, | ||
| instance_id = format!("{:x}", instance_id), | ||
| "Emitting Removed event" | ||
| ); | ||
| client.invalidate_cache(instance_id).await; | ||
| if tx.send(Ok(DiscoveryEvent::Removed(instance_id))).is_err() { | ||
| tracing::debug!(stream_id = %stream_id, "Receiver dropped, stopping monitor"); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch loop never emits updates for pods whose metadata changes.
Here the diff only tracks instance_ids (pod hashes). When a pod updates its advertised endpoints/model cards but remains in the EndpointSlice, it is neither in added nor removed, so you never refetch its metadata and no Added/Removed events fire. As a result, downstream consumers never learn about the change. Please trigger a refresh when an EndpointSlice changes (e.g., compare ResourceVersions/per-address hashes, or refetch on every Applied event) so metadata updates propagate even when the pod itself stays online.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests
Chores