Skip to content

Commit 4864139

Browse files
authored
Ease deployment from old routing table to new (quickwit-oss#6207)
1 parent 290d055 commit 4864139

File tree

2 files changed

+75
-18
lines changed

2 files changed

+75
-18
lines changed

quickwit/quickwit-ingest/src/ingest_v2/router.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -672,12 +672,17 @@ mod tests {
672672

673673
{
674674
let mut state_guard = router.state.lock().await;
675-
state_guard.routing_table.apply_capacity_update(
676-
"test-ingester-0".into(),
675+
state_guard.routing_table.merge_from_shards(
677676
IndexUid::for_test("test-index-0", 0),
678677
"test-source".to_string(),
679-
8,
680-
1,
678+
vec![Shard {
679+
index_uid: Some(IndexUid::for_test("test-index-0", 0)),
680+
source_id: "test-source".to_string(),
681+
shard_id: Some(ShardId::from(1u64)),
682+
shard_state: ShardState::Open as i32,
683+
leader_id: "test-ingester-0".to_string(),
684+
..Default::default()
685+
}],
681686
);
682687
}
683688

quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ pub(super) struct IngesterNode {
4141
#[derive(Debug, Default)]
4242
pub(super) struct RoutingEntry {
4343
pub nodes: HashMap<NodeId, IngesterNode>,
44+
/// Whether this entry has been seeded from a control plane response. During a rolling
45+
/// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table
46+
/// before the router ever asks the CP, causing it to miss old-version nodes. This flag
47+
/// ensures the router asks the CP at least once per (index, source) pair.
48+
seeded_from_cp: bool,
4449
}
4550

4651
/// Given a slice of candidates, picks the better of two random choices.
@@ -180,6 +185,10 @@ impl RoutingTable {
180185
let Some(entry) = self.table.get(&key) else {
181186
return false;
182187
};
188+
// Routers must sync with the control plane at least once per (index, source).
189+
if !entry.seeded_from_cp {
190+
return false;
191+
}
183192
entry.nodes.values().any(|node| {
184193
node.capacity_score > 0
185194
&& node.open_shard_count > 0
@@ -250,6 +259,7 @@ impl RoutingTable {
250259
open_shard_count,
251260
});
252261
}
262+
entry.seeded_from_cp = true;
253263
}
254264
}
255265

@@ -327,18 +337,33 @@ mod tests {
327337
fn test_has_open_nodes() {
328338
let mut table = RoutingTable::default();
329339
let pool = IngesterPool::default();
340+
let index_uid = IndexUid::for_test("test-index", 0);
330341

331342
// Empty table.
332343
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));
333344

345+
// Seed from CP so has_open_nodes can return true.
346+
let shards = vec![
347+
Shard {
348+
index_uid: Some(index_uid.clone()),
349+
source_id: "test-source".to_string(),
350+
shard_id: Some(ShardId::from(1u64)),
351+
shard_state: ShardState::Open as i32,
352+
leader_id: "node-1".to_string(),
353+
..Default::default()
354+
},
355+
Shard {
356+
index_uid: Some(index_uid.clone()),
357+
source_id: "test-source".to_string(),
358+
shard_id: Some(ShardId::from(2u64)),
359+
shard_state: ShardState::Open as i32,
360+
leader_id: "node-2".to_string(),
361+
..Default::default()
362+
},
363+
];
364+
table.merge_from_shards(index_uid.clone(), "test-source".into(), shards);
365+
334366
// Node exists but is not in pool.
335-
table.apply_capacity_update(
336-
"node-1".into(),
337-
IndexUid::for_test("test-index", 0),
338-
"test-source".into(),
339-
8,
340-
3,
341-
);
342367
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));
343368

344369
// Node is in pool → true.
@@ -350,13 +375,6 @@ mod tests {
350375
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable));
351376

352377
// Second node available → true despite first being unavailable.
353-
table.apply_capacity_update(
354-
"node-2".into(),
355-
IndexUid::for_test("test-index", 0),
356-
"test-source".into(),
357-
6,
358-
2,
359-
);
360378
pool.insert("node-2".into(), mocked_ingester(None));
361379
assert!(table.has_open_nodes("test-index", "test-source", &pool, &unavailable));
362380

@@ -371,6 +389,40 @@ mod tests {
371389
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable));
372390
}
373391

392+
#[test]
393+
fn test_has_open_nodes_requires_cp_seed() {
394+
let mut table = RoutingTable::default();
395+
let pool = IngesterPool::default();
396+
pool.insert("node-1".into(), mocked_ingester(None));
397+
398+
// Chitchat broadcast populates the entry, but has_open_nodes still returns false
399+
// because the entry hasn't been seeded from the control plane yet.
400+
table.apply_capacity_update(
401+
"node-1".into(),
402+
IndexUid::for_test("test-index", 0),
403+
"test-source".into(),
404+
8,
405+
3,
406+
);
407+
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));
408+
409+
// After merge_from_shards (CP response), has_open_nodes returns true.
410+
let shards = vec![Shard {
411+
index_uid: Some(IndexUid::for_test("test-index", 0)),
412+
source_id: "test-source".to_string(),
413+
shard_id: Some(ShardId::from(1u64)),
414+
shard_state: ShardState::Open as i32,
415+
leader_id: "node-1".to_string(),
416+
..Default::default()
417+
}];
418+
table.merge_from_shards(
419+
IndexUid::for_test("test-index", 0),
420+
"test-source".into(),
421+
shards,
422+
);
423+
assert!(table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));
424+
}
425+
374426
#[test]
375427
fn test_pick_node_prefers_same_az() {
376428
let mut table = RoutingTable::new(Some("az-1".to_string()));

0 commit comments

Comments
 (0)