Skip to content

Commit 09e7159

Browse files
Fix infinite loop OOM bug caused by rebalancing shards from unavailable ingesters (quickwit-oss#6078)
Co-authored-by: fulmicoton <paul.masurel@datadoghq.com>
1 parent 2acab20 commit 09e7159

File tree

1 file changed

+129
-106
lines changed

1 file changed

+129
-106
lines changed

quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs

Lines changed: 129 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::time::Duration;
2323
use fnv::FnvHashSet;
2424
use futures::StreamExt;
2525
use futures::stream::FuturesUnordered;
26-
use itertools::{Itertools, MinMaxResult};
26+
use itertools::{Itertools as _, MinMaxResult};
2727
use quickwit_actors::Mailbox;
2828
use quickwit_common::Progress;
2929
use quickwit_common::pretty::PrettySample;
@@ -1103,56 +1103,42 @@ impl IngestController {
11031103
Ok(Some(tokio::spawn(close_shards_and_send_callback_fut)))
11041104
}
11051105

1106-
/// This method just "computes"" the number of shards to move for rebalance.
1107-
/// It does not run any side effect except logging.
1106+
/// Computes shards that need to be rebalanced.
11081107
///
1109-
/// TODO: We consider the number of available (i.e. alive according to chitchat) ingesters for
1110-
/// this computation, but deal with the entire number of shards here.
1111-
/// This could cause problems when dealing with a lot of unavailable ingesters.
1108+
/// This function identifies which shards should be moved to achieve a balance across available
1109+
/// ingesters.
1110+
/// It does not mutate any state. It just identifies the list of shards
1111+
/// that need to be rebalanced.
11121112
///
1113-
/// On the other hand it biases thing the "right way":
1114-
/// If we are missing some ingesters, their shards should still be in the model, but they should
1115-
/// be missing from the ingester pool.
1116-
///
1117-
/// As a result `target_num_open_shards_per_leader` should be inflated.
1118-
///
1119-
/// TODO: This implementation does not consider replicas.
1113+
/// Unfortunately, we cannot move shards that are on unavailable ingesters.
1114+
/// The closing operation can only be done by the leader of that shard.
1115+
/// For these reason, we exclude these shards from the rebalance process.
11201116
fn compute_shards_to_rebalance(&self, model: &ControlPlaneModel) -> Vec<Shard> {
1121-
let ingester_ids: Vec<NodeId> = self.ingester_pool.keys();
1122-
let num_ingesters = ingester_ids.len();
1123-
1124-
if num_ingesters == 0 {
1125-
debug!("no ingesters available");
1126-
return Vec::new();
1127-
}
1128-
if num_ingesters < 2 {
1129-
return Vec::new();
1130-
}
1131-
let mut num_open_shards: usize = 0;
1132-
let mut per_leader_open_shards: HashMap<&str, Vec<&Shard>> = HashMap::new();
1117+
let mut per_available_ingester_shards: HashMap<NodeId, Vec<&Shard>> = self
1118+
.ingester_pool
1119+
.keys()
1120+
.into_iter()
1121+
.map(|ingester_id| (ingester_id, Vec::new()))
1122+
.collect();
11331123

1124+
let mut num_available_shards: usize = 0;
11341125
for shard in model.all_shards() {
1135-
if shard.is_open() {
1136-
num_open_shards += 1;
1137-
per_leader_open_shards
1138-
.entry(&shard.leader_id)
1139-
.or_default()
1140-
.push(&shard.shard);
1126+
if !shard.is_open() {
1127+
continue;
1128+
}
1129+
let leader_id_ref = NodeIdRef::from_str(&shard.leader_id);
1130+
if let Some(shards) = per_available_ingester_shards.get_mut(leader_id_ref) {
1131+
// We only consider shards that are on available ingesters
1132+
// because we won't be able to move shards that are not reachable.
1133+
num_available_shards += 1;
1134+
shards.push(&shard.shard)
11411135
}
11421136
}
1143-
for ingester_id in &ingester_ids {
1144-
per_leader_open_shards
1145-
.entry(ingester_id.as_str())
1146-
.or_default();
1147-
}
1148-
let target_num_open_shards_per_leader = num_open_shards as f32 / num_ingesters as f32;
1149-
let max_num_open_shards_per_leader =
1150-
f32::ceil(target_num_open_shards_per_leader * 1.1) as usize;
1151-
let min_num_open_shards_per_leader =
1152-
f32::floor(target_num_open_shards_per_leader * 0.9) as usize;
1137+
1138+
let num_available_ingesters = per_available_ingester_shards.len();
11531139

11541140
let mut rng = rng();
1155-
let mut per_leader_open_shard_shuffled: Vec<Vec<&Shard>> = per_leader_open_shards
1141+
let mut per_leader_open_shards_shuffled: Vec<Vec<&Shard>> = per_available_ingester_shards
11561142
.into_values()
11571143
.map(|mut shards| {
11581144
shards.shuffle(&mut rng);
@@ -1162,40 +1148,48 @@ impl IngestController {
11621148

11631149
let mut shards_to_rebalance: Vec<Shard> = Vec::new();
11641150

1165-
loop {
1166-
let MinMaxResult::MinMax(min_shards, max_shards) = per_leader_open_shard_shuffled
1151+
// This is more of a loop-loop, but since we know it should exit before
1152+
// `num_available_ingesters`, we defensively use a for-loop.
1153+
for _ in 0..num_available_shards {
1154+
let MinMaxResult::MinMax(min_shards, max_shards) = per_leader_open_shards_shuffled
11671155
.iter_mut()
11681156
.minmax_by_key(|shards| shards.len())
11691157
else {
1158+
// There are less than 2 ingesters.
1159+
// Nothing to do here.
11701160
break;
11711161
};
1172-
if min_shards.len() < min_num_open_shards_per_leader
1173-
|| max_shards.len() > max_num_open_shards_per_leader
1162+
1163+
// We leave a tolerance of 1/10 between the min and max number of shards per leader
1164+
const TOLERANCE_INV_RATIO: usize = 10;
1165+
if max_shards.len()
1166+
< min_shards.len() + min_shards.len().div_ceil(TOLERANCE_INV_RATIO).max(2)
11741167
{
1175-
let shard = max_shards.pop().expect("shards should not be empty");
1176-
shards_to_rebalance.push(shard.clone());
1177-
min_shards.push(shard);
1178-
} else {
11791168
break;
11801169
}
1170+
1171+
let shard = max_shards.pop().expect("shards should not be empty");
1172+
shards_to_rebalance.push(shard.clone());
1173+
min_shards.push(shard);
11811174
}
1182-
let num_shards_to_rebalance = shards_to_rebalance.len();
11831175

1184-
if num_shards_to_rebalance == 0 {
1176+
if shards_to_rebalance.is_empty() {
11851177
debug!("no shards to rebalance");
11861178
} else {
11871179
info!(
1188-
num_open_shards,
1189-
num_available_ingesters = num_ingesters,
1190-
min_shards_threshold = min_num_open_shards_per_leader,
1191-
max_shards_threshold = max_num_open_shards_per_leader,
1192-
num_shards_to_rebalance,
1193-
"rebalancing {num_shards_to_rebalance} shards"
1180+
num_available_shards,
1181+
num_available_ingesters,
1182+
num_shards_to_rebalance = shards_to_rebalance.len(),
1183+
"rebalancing shards"
11941184
);
11951185
}
1186+
11961187
shards_to_rebalance
11971188
}
11981189

1190+
/// Attempts to close the list of shards passed as argument.
1191+
///
1192+
/// If ingesters are not available, the shards are not closed.
11991193
fn close_shards(
12001194
&self,
12011195
shards_to_close: Vec<Shard>,
@@ -1308,7 +1302,6 @@ fn find_scale_down_candidate(
13081302

13091303
#[cfg(test)]
13101304
mod tests {
1311-
13121305
use std::collections::BTreeSet;
13131306
use std::str::FromStr;
13141307
use std::sync::Arc;
@@ -3544,7 +3537,16 @@ mod tests {
35443537
);
35453538
}
35463539

3547-
fn test_compute_shards_to_rebalance_aux(shard_allocation: &[usize]) {
3540+
/// Test helper for compute_shards_to_rebalance.
3541+
/// The reason for testing both available and unavailable ingesters with open shards is to
3542+
/// ensure the algorithm holds up when there are open shards
3543+
///
3544+
/// - `available_ingester_shards`: open shards per available ingester
3545+
/// - `unavailable_ingester_shards`: open shards on unavailable ingesters
3546+
fn test_compute_shards_to_rebalance_aux(
3547+
available_ingester_shards: &[usize],
3548+
unavailable_ingester_shards: &[usize],
3549+
) {
35483550
let index_id = "test-index";
35493551
let index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index");
35503552
let index_uid = index_metadata.index_uid.clone();
@@ -3561,29 +3563,50 @@ mod tests {
35613563
let mock_ingester = MockIngesterService::new();
35623564
let ingester_client = IngesterServiceClient::from_mock(mock_ingester);
35633565

3564-
let ingester_ids: Vec<String> = (0..shard_allocation.len())
3565-
.map(|i| format!("test-ingester-{}", i))
3566+
let active_ids: Vec<String> = (0..available_ingester_shards.len())
3567+
.map(|i| format!("active-ingester-{}", i))
35663568
.collect();
35673569

3568-
for ingester_id in &ingester_ids {
3570+
for ingester_id in &active_ids {
35693571
ingester_pool.insert(NodeId::from(ingester_id.clone()), ingester_client.clone());
35703572
}
3571-
let mut shards = Vec::new();
35723573

3573-
for (ingester_idx, &num_shards) in shard_allocation.iter().enumerate() {
3574+
let inactive_ids: Vec<String> = (0..unavailable_ingester_shards.len())
3575+
.map(|i| format!("inactive-ingester-{}", i))
3576+
.collect();
3577+
3578+
let mut shards: Vec<Shard> = Vec::new();
3579+
let mut shard_id: u64 = 0;
3580+
3581+
for (idx, &num_shards) in available_ingester_shards.iter().enumerate() {
35743582
for _ in 0..num_shards {
3575-
let shard_id = shards.len() as u64;
3576-
let shard = Shard {
3583+
shards.push(Shard {
35773584
index_uid: Some(index_uid.clone()),
3578-
source_id: source_id.to_string(),
3585+
source_id: source_id.clone(),
35793586
shard_id: Some(ShardId::from(shard_id)),
3580-
leader_id: ingester_ids[ingester_idx].clone(),
3587+
leader_id: active_ids[idx].clone(),
35813588
shard_state: ShardState::Open as i32,
35823589
..Default::default()
3583-
};
3584-
shards.push(shard);
3590+
});
3591+
shard_id += 1;
3592+
}
3593+
}
3594+
3595+
// Shards on unavailable ingesters - these shouldn't affect rebalancing calculations
3596+
for (idx, &num_shards) in unavailable_ingester_shards.iter().enumerate() {
3597+
for _ in 0..num_shards {
3598+
shards.push(Shard {
3599+
index_uid: Some(index_uid.clone()),
3600+
source_id: source_id.clone(),
3601+
shard_id: Some(ShardId::from(shard_id)),
3602+
leader_id: inactive_ids[idx].clone(),
3603+
shard_state: ShardState::Open as i32,
3604+
..Default::default()
3605+
});
3606+
shard_id += 1;
35853607
}
35863608
}
3609+
35873610
model.insert_shards(&index_uid, &source_id, shards.clone());
35883611

35893612
let controller = IngestController::new(
@@ -3607,25 +3630,29 @@ mod tests {
36073630
let closed_shard_ids = model.close_shards(&source_uid, &shard_ids_to_rebalance);
36083631
assert_eq!(closed_shard_ids.len(), shards_to_rebalance.len());
36093632

3610-
let mut per_ingester_num_shards: HashMap<&str, usize> = HashMap::new();
3633+
let mut per_available_ingester_num_shards: HashMap<&str, usize> = active_ids
3634+
.iter()
3635+
.map(|active_id| (active_id.as_str(), 0))
3636+
.collect();
36113637

36123638
for shard in model.all_shards() {
3613-
if shard.is_open() {
3614-
*per_ingester_num_shards.entry(&shard.leader_id).or_default() += 1;
3639+
if !shard.is_open() {
3640+
continue;
3641+
}
3642+
if let Some(count_shard) =
3643+
per_available_ingester_num_shards.get_mut(shard.leader_id.as_str())
3644+
{
3645+
*count_shard += 1;
36153646
}
36163647
}
3617-
for ingester_id in &ingester_ids {
3618-
per_ingester_num_shards
3619-
.entry(ingester_id.as_str())
3620-
.or_default();
3621-
}
3622-
let mut per_ingester_num_shards_sorted: BTreeSet<(usize, &str)> = per_ingester_num_shards
3623-
.into_iter()
3624-
.map(|(ingester_id, num_shards)| (num_shards, ingester_id))
3625-
.collect();
3626-
let mut opened_shards: Vec<Shard> = Vec::new();
3627-
let mut shard_id = shards.len() as u64;
36283648

3649+
// Now we move the different shards.
3650+
let mut per_ingester_num_shards_sorted: BTreeSet<(usize, &str)> =
3651+
per_available_ingester_num_shards
3652+
.into_iter()
3653+
.map(|(ingester_id, num_shards)| (num_shards, ingester_id))
3654+
.collect();
3655+
let mut opened_shards: Vec<Shard> = Vec::new();
36293656
for _ in 0..shards_to_rebalance.len() {
36303657
let (num_shards, ingester_id) = per_ingester_num_shards_sorted.pop_first().unwrap();
36313658
let opened_shard = Shard {
@@ -3640,23 +3667,16 @@ mod tests {
36403667
opened_shards.push(opened_shard);
36413668
shard_id += 1;
36423669
}
3643-
let num_open_shards: usize = per_ingester_num_shards_sorted
3670+
3671+
if let Some((min_shards, max_shards)) = per_ingester_num_shards_sorted
36443672
.iter()
36453673
.map(|(num_shards, _)| num_shards)
3646-
.sum();
3647-
let target_num_open_shards_per_leader = num_open_shards as f32 / ingester_ids.len() as f32;
3648-
let max_num_open_shards_per_leader =
3649-
f32::ceil(target_num_open_shards_per_leader * 1.1) as usize;
3650-
let min_num_open_shards_per_leader =
3651-
f32::floor(target_num_open_shards_per_leader * 0.9) as usize;
3652-
assert!(
3653-
per_ingester_num_shards_sorted
3654-
.iter()
3655-
.all(
3656-
|(num_shards, _)| *num_shards >= min_num_open_shards_per_leader
3657-
&& *num_shards <= max_num_open_shards_per_leader
3658-
)
3659-
);
3674+
.copied()
3675+
.minmax()
3676+
.into_option()
3677+
{
3678+
assert!(min_shards + min_shards.div_ceil(10).max(2) >= max_shards);
3679+
}
36603680

36613681
// Test stability of the algorithm
36623682
model.insert_shards(&index_uid, &source_id, opened_shards);
@@ -3668,17 +3688,20 @@ mod tests {
36683688
proptest! {
36693689
#[test]
36703690
fn test_compute_shards_to_rebalance_proptest(
3671-
shard_allocation in proptest::collection::vec(0..13usize, 0..13usize),
3691+
active_shards in proptest::collection::vec(0..13usize, 0..13usize),
3692+
inactive_shards in proptest::collection::vec(0..13usize, 0..5usize),
36723693
) {
3673-
test_compute_shards_to_rebalance_aux(&shard_allocation);
3694+
test_compute_shards_to_rebalance_aux(&active_shards, &inactive_shards);
36743695
}
36753696
}
36763697

36773698
#[test]
36783699
fn test_compute_shards_to_rebalance() {
3679-
test_compute_shards_to_rebalance_aux(&[]);
3680-
test_compute_shards_to_rebalance_aux(&[0]);
3681-
test_compute_shards_to_rebalance_aux(&[1]);
3682-
test_compute_shards_to_rebalance_aux(&[0, 1]);
3700+
test_compute_shards_to_rebalance_aux(&[], &[]);
3701+
test_compute_shards_to_rebalance_aux(&[0], &[]);
3702+
test_compute_shards_to_rebalance_aux(&[1], &[]);
3703+
test_compute_shards_to_rebalance_aux(&[0, 1], &[]);
3704+
test_compute_shards_to_rebalance_aux(&[0, 1], &[1]);
3705+
test_compute_shards_to_rebalance_aux(&[0, 1, 2], &[3, 4]);
36833706
}
36843707
}

0 commit comments

Comments
 (0)