Skip to content

Commit 2f9812a

Browse files
fix: fix bug in diffing logic in list_and_watch (#5318)
Signed-off-by: mohammedabdulwahhab <[email protected]>
1 parent 91eb0ed commit 2f9812a

File tree

11 files changed

+373
-226
lines changed

11 files changed

+373
-226
lines changed

lib/bindings/c/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,12 +1306,12 @@ fn spawn_prefill_watcher(
13061306
}
13071307
}
13081308
}
1309-
DiscoveryEvent::Removed(instance_id) => {
1309+
DiscoveryEvent::Removed(id) => {
13101310
// Log removal for observability
13111311
// Note: The PrefillRouter remains active - worker availability
13121312
// is handled dynamically by the underlying Client's instance tracking
13131313
tracing::debug!(
1314-
instance_id = instance_id,
1314+
instance_id = id.instance_id(),
13151315
"Prefill worker instance removed from discovery"
13161316
);
13171317
}

lib/llm/src/discovery/model_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,14 +320,14 @@ impl ModelManager {
320320
.ok_or(ModelManagerError::ModelNotFound(model.to_string()))
321321
}
322322

323-
/// Save a ModelDeploymentCard from an instance's ModelDeploymentCard key so we can fetch it later when the key is
323+
/// Save a ModelDeploymentCard from an instance's key so we can fetch it later when the key is
324324
/// deleted.
325325
pub fn save_model_card(&self, key: &str, card: ModelDeploymentCard) -> anyhow::Result<()> {
326326
self.cards.lock().insert(key.to_string(), card);
327327
Ok(())
328328
}
329329

330-
/// Remove and return model card for this instance's etcd key. We do this when the instance stops.
330+
/// Remove and return model card for this instance's key. We do this when the instance stops.
331331
pub fn remove_model_card(&self, key: &str) -> Option<ModelDeploymentCard> {
332332
self.cards.lock().remove(key)
333333
}

lib/llm/src/discovery/watcher.rs

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use futures::StreamExt;
1010

1111
use dynamo_runtime::{
1212
DistributedRuntime,
13-
discovery::{DiscoveryEvent, DiscoveryInstance, DiscoveryQuery, DiscoveryStream},
13+
discovery::{
14+
DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId, DiscoveryQuery, DiscoveryStream,
15+
ModelCardInstanceId,
16+
},
1417
pipeline::{
1518
ManyOut, Operator, RouterMode, SegmentSource, ServiceBackend, SingleIn, Source,
1619
network::egress::push_router::PushRouter,
@@ -119,23 +122,26 @@ impl ModelWatcher {
119122

120123
match event {
121124
DiscoveryEvent::Added(instance) => {
122-
// Extract EndpointId, instance_id, and card from the discovery instance
123-
let (endpoint_id, instance_id, mut card) = match &instance {
125+
// Extract ModelCardInstanceId and card from the discovery instance
126+
let (mcid, mut card) = match &instance {
124127
DiscoveryInstance::Model {
125128
namespace,
126129
component,
127130
endpoint,
128131
instance_id,
132+
model_suffix,
129133
..
130134
} => {
131-
let eid = EndpointId {
135+
let mcid = ModelCardInstanceId {
132136
namespace: namespace.clone(),
133137
component: component.clone(),
134-
name: endpoint.clone(),
138+
endpoint: endpoint.clone(),
139+
instance_id: *instance_id,
140+
model_suffix: model_suffix.clone(),
135141
};
136142

137143
match instance.deserialize_model::<ModelDeploymentCard>() {
138-
Ok(card) => (eid, *instance_id, card),
144+
Ok(card) => (mcid, card),
139145
Err(err) => {
140146
tracing::error!(%err, instance_id, "Failed to deserialize model card");
141147
continue;
@@ -153,10 +159,10 @@ impl ModelWatcher {
153159
// Filter by namespace if target_namespace is specified
154160
if !global_namespace
155161
&& let Some(target_ns) = target_namespace
156-
&& endpoint_id.namespace != target_ns
162+
&& mcid.namespace != target_ns
157163
{
158164
tracing::debug!(
159-
model_namespace = endpoint_id.namespace,
165+
model_namespace = mcid.namespace,
160166
target_namespace = target_ns,
161167
"Skipping model from different namespace"
162168
);
@@ -185,34 +191,39 @@ impl ModelWatcher {
185191
continue;
186192
}
187193

188-
// Use instance_id as the HashMap key (simpler and sufficient since keys are opaque)
189-
let key = format!("{:x}", instance_id);
190-
191-
match self.handle_put(&key, &endpoint_id, &mut card).await {
194+
match self.handle_put(&mcid, &mut card).await {
192195
Ok(()) => {
193196
tracing::info!(
194197
model_name = card.name(),
195-
namespace = endpoint_id.namespace,
198+
namespace = mcid.namespace,
196199
"added model"
197200
);
198201
self.notify_on_model.notify_waiters();
199202
}
200203
Err(err) => {
201204
tracing::error!(
202205
model_name = card.name(),
203-
namespace = endpoint_id.namespace,
206+
namespace = mcid.namespace,
204207
error = format!("{err:#}"),
205208
"Error adding model from discovery",
206209
);
207210
}
208211
}
209212
}
210-
DiscoveryEvent::Removed(instance_id) => {
211-
// Use instance_id hex as the HashMap key (matches what we saved with)
212-
let key = format!("{:x}", instance_id);
213+
DiscoveryEvent::Removed(id) => {
214+
// Extract ModelCardInstanceId from the removal event
215+
let model_card_instance_id = match &id {
216+
DiscoveryInstanceId::Model(mcid) => mcid,
217+
DiscoveryInstanceId::Endpoint(_) => {
218+
tracing::error!(
219+
"Unexpected discovery instance type in removal (expected Model)"
220+
);
221+
continue;
222+
}
223+
};
213224

214225
match self
215-
.handle_delete(&key, target_namespace, global_namespace)
226+
.handle_delete(model_card_instance_id, target_namespace, global_namespace)
216227
.await
217228
{
218229
Ok(Some(model_name)) => {
@@ -234,14 +245,15 @@ impl ModelWatcher {
234245
/// Returns the name of the model we just deleted, if any.
235246
async fn handle_delete(
236247
&self,
237-
key: &str,
248+
mcid: &ModelCardInstanceId,
238249
target_namespace: Option<&str>,
239250
is_global_namespace: bool,
240251
) -> anyhow::Result<Option<String>> {
241-
let card = match self.manager.remove_model_card(key) {
252+
let key = mcid.to_path();
253+
let card = match self.manager.remove_model_card(&key) {
242254
Some(card) => card,
243255
None => {
244-
anyhow::bail!("Missing ModelDeploymentCard for {key}");
256+
anyhow::bail!("Missing ModelDeploymentCard for {}", key);
245257
}
246258
};
247259
let model_name = card.name().to_string();
@@ -325,20 +337,20 @@ impl ModelWatcher {
325337
// models.
326338
async fn handle_put(
327339
&self,
328-
key: &str,
329-
endpoint_id: &EndpointId,
340+
mcid: &ModelCardInstanceId,
330341
card: &mut ModelDeploymentCard,
331342
) -> anyhow::Result<()> {
332343
card.download_config().await?;
333344

334345
let component = self
335346
.drt
336-
.namespace(&endpoint_id.namespace)?
337-
.component(&endpoint_id.component)?;
338-
let endpoint = component.endpoint(&endpoint_id.name);
347+
.namespace(&mcid.namespace)?
348+
.component(&mcid.component)?;
349+
let endpoint = component.endpoint(&mcid.endpoint);
339350
let client = endpoint.client().await?;
340351
tracing::debug!(model_name = card.name(), "adding model");
341-
self.manager.save_model_card(key, card.clone())?;
352+
self.manager
353+
.save_model_card(&mcid.to_path(), card.clone())?;
342354

343355
// Skip duplicate registrations based on model type.
344356
// Prefill and decode models are tracked separately, so registering one
@@ -352,7 +364,7 @@ impl ModelWatcher {
352364
if already_registered {
353365
tracing::debug!(
354366
model_name = card.name(),
355-
namespace = endpoint_id.namespace,
367+
namespace = mcid.namespace,
356368
model_type = %card.model_type,
357369
"Model already registered, skipping"
358370
);
@@ -372,7 +384,7 @@ impl ModelWatcher {
372384
// A model that expects pre-processed requests meaning it's up to us whether we
373385
// handle Chat or Completions requests, so handle whatever the model supports.
374386

375-
let endpoint = component.endpoint(&endpoint_id.name);
387+
let endpoint = component.endpoint(&mcid.endpoint);
376388
let kv_chooser = if self.router_config.router_mode == RouterMode::KV {
377389
Some(
378390
self.manager

lib/llm/src/kv_router/subscriber.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,10 +564,12 @@ pub async fn start_kv_router_background(
564564
continue;
565565
};
566566

567-
let DiscoveryEvent::Removed(worker_id) = discovery_event else {
567+
let DiscoveryEvent::Removed(id) = discovery_event else {
568568
continue;
569569
};
570570

571+
let worker_id = id.instance_id();
572+
571573
tracing::warn!(
572574
"DISCOVERY: Generate endpoint instance removed, removing worker {worker_id}"
573575
);
@@ -642,11 +644,13 @@ pub async fn start_kv_router_background(
642644
continue;
643645
};
644646

645-
let DiscoveryEvent::Removed(router_instance_id) = router_event else {
647+
let DiscoveryEvent::Removed(id) = router_event else {
646648
// We only care about removals for cleaning up consumers
647649
continue;
648650
};
649651

652+
let router_instance_id = id.instance_id();
653+
650654
// The consumer UUID is the instance_id in hex format
651655
let consumer_to_delete = router_instance_id.to_string();
652656

@@ -708,7 +712,8 @@ async fn handle_worker_discovery(
708712
}
709713
}
710714
}
711-
DiscoveryEvent::Removed(worker_id) => {
715+
DiscoveryEvent::Removed(id) => {
716+
let worker_id = id.instance_id();
712717
tracing::warn!("DISCOVERY: Worker {worker_id} removed, removing from router indexer");
713718

714719
if let Err(e) = remove_worker_tx.send(worker_id).await {

lib/runtime/src/component/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use arc_swap::ArcSwap;
99
use futures::StreamExt;
1010
use tokio::net::unix::pipe::Receiver;
1111

12-
use crate::discovery::{DiscoveryEvent, DiscoveryInstance};
12+
use crate::discovery::{DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId};
1313
use crate::{
1414
component::{Endpoint, Instance},
1515
pipeline::async_trait,
@@ -255,8 +255,8 @@ impl Client {
255255
map.insert(instance.instance_id, instance);
256256
}
257257
}
258-
DiscoveryEvent::Removed(instance_id) => {
259-
map.remove(&instance_id);
258+
DiscoveryEvent::Removed(id) => {
259+
map.remove(&id.instance_id());
260260
}
261261
}
262262

0 commit comments

Comments
 (0)