Skip to content

Commit 2bcade6

Browse files
authored
Merge feature node based routing (quickwit-oss#6203)
* Implement IngesterCapacityScore broadcast (quickwit-oss#6152) * Implement node based routing table (quickwit-oss#6159) * Use new node based routing table for routing decisions (quickwit-oss#6163) * Piggyback routing update on persist response (quickwit-oss#6173) * Remove unused shard_ids in persist protos (quickwit-oss#6169) * Add availability zone awareness to node based routing (quickwit-oss#6189) * Remove old routing table; Take both disk and memory WAL readings (quickwit-oss#6193) * Add az-aware ingest attempts metric (quickwit-oss#6194)
1 parent 8954dde commit 2bcade6

File tree

31 files changed

+2416
-2434
lines changed

31 files changed

+2416
-2434
lines changed

quickwit/quickwit-cluster/src/node.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl ClusterNode {
5050
ingester_status: member.ingester_status,
5151
is_ready: member.is_ready,
5252
is_self_node,
53+
availability_zone: member.availability_zone,
5354
};
5455
let node = ClusterNode {
5556
inner: Arc::new(inner),
@@ -141,6 +142,10 @@ impl ClusterNode {
141142
pub fn is_self_node(&self) -> bool {
142143
self.inner.is_self_node
143144
}
145+
146+
pub fn availability_zone(&self) -> Option<&str> {
147+
self.inner.availability_zone.as_deref()
148+
}
144149
}
145150

146151
impl Debug for ClusterNode {
@@ -162,6 +167,7 @@ impl PartialEq for ClusterNode {
162167
&& self.inner.indexing_tasks == other.inner.indexing_tasks
163168
&& self.inner.is_ready == other.inner.is_ready
164169
&& self.inner.is_self_node == other.inner.is_self_node
170+
&& self.inner.availability_zone == other.inner.availability_zone
165171
}
166172
}
167173

@@ -175,4 +181,5 @@ struct InnerNode {
175181
ingester_status: IngesterStatus,
176182
is_ready: bool,
177183
is_self_node: bool,
184+
availability_zone: Option<String>,
178185
}

quickwit/quickwit-common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub mod rate_limited_tracing;
3636
pub mod rate_limiter;
3737
pub mod rendezvous_hasher;
3838
pub mod retry;
39+
pub mod ring_buffer;
3940
pub mod runtimes;
4041
pub mod shared_consts;
4142
pub mod sorted_iter;
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::{Debug, Formatter};
16+
17+
/// Fixed-size buffer that keeps the last N elements pushed into it.
18+
///
19+
/// `head` is the write cursor. It advances by one on each push and wraps
20+
/// back to 0 when it reaches N, overwriting the oldest element.
21+
///
22+
/// ```text
23+
/// RingBuffer<u32, 4> after pushing 1, 2, 3, 4, 5, 6:
24+
///
25+
/// buffer = [5, 6, 3, 4] head = 2 len = 4
26+
/// ^
27+
/// next write goes here
28+
///
29+
/// logical view (oldest → newest): [3, 4, 5, 6]
30+
/// ```
31+
pub struct RingBuffer<T: Copy + Default, const N: usize> {
32+
buffer: [T; N],
33+
head: usize,
34+
len: usize,
35+
}
36+
37+
impl<T: Copy + Default, const N: usize> Default for RingBuffer<T, N> {
38+
fn default() -> Self {
39+
Self {
40+
buffer: [T::default(); N],
41+
head: 0,
42+
len: 0,
43+
}
44+
}
45+
}
46+
47+
impl<T: Copy + Default + Debug, const N: usize> Debug for RingBuffer<T, N> {
48+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49+
f.debug_list().entries(self.iter()).finish()
50+
}
51+
}
52+
53+
impl<T: Copy + Default, const N: usize> RingBuffer<T, N> {
54+
pub fn push_back(&mut self, value: T) {
55+
self.buffer[self.head] = value;
56+
self.head = (self.head + 1) % N;
57+
if self.len < N {
58+
self.len += 1;
59+
}
60+
}
61+
62+
pub fn last(&self) -> Option<T> {
63+
if self.len == 0 {
64+
return None;
65+
}
66+
Some(self.buffer[(self.head + N - 1) % N])
67+
}
68+
69+
pub fn front(&self) -> Option<T> {
70+
if self.len == 0 {
71+
return None;
72+
}
73+
Some(self.buffer[(self.head + N - self.len) % N])
74+
}
75+
76+
pub fn len(&self) -> usize {
77+
self.len
78+
}
79+
80+
pub fn is_empty(&self) -> bool {
81+
self.len == 0
82+
}
83+
84+
pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
85+
let start = (self.head + N - self.len) % N;
86+
(0..self.len).map(move |i| &self.buffer[(start + i) % N])
87+
}
88+
}
89+
90+
#[cfg(test)]
91+
mod tests {
92+
use super::*;
93+
94+
#[test]
95+
fn test_empty() {
96+
let rb = RingBuffer::<u32, 4>::default();
97+
assert!(rb.is_empty());
98+
assert_eq!(rb.len(), 0);
99+
assert_eq!(rb.last(), None);
100+
assert_eq!(rb.front(), None);
101+
assert_eq!(rb.iter().count(), 0);
102+
}
103+
104+
#[test]
105+
fn test_single_push() {
106+
let mut rb = RingBuffer::<u32, 4>::default();
107+
rb.push_back(10);
108+
assert_eq!(rb.len(), 1);
109+
assert!(!rb.is_empty());
110+
assert_eq!(rb.last(), Some(10));
111+
assert_eq!(rb.front(), Some(10));
112+
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![10]);
113+
}
114+
115+
#[test]
116+
fn test_partial_fill() {
117+
let mut rb = RingBuffer::<u32, 4>::default();
118+
rb.push_back(1);
119+
rb.push_back(2);
120+
rb.push_back(3);
121+
assert_eq!(rb.len(), 3);
122+
assert_eq!(rb.last(), Some(3));
123+
assert_eq!(rb.front(), Some(1));
124+
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3]);
125+
}
126+
127+
#[test]
128+
fn test_exactly_full() {
129+
let mut rb = RingBuffer::<u32, 4>::default();
130+
for i in 1..=4 {
131+
rb.push_back(i);
132+
}
133+
assert_eq!(rb.len(), 4);
134+
assert_eq!(rb.last(), Some(4));
135+
assert_eq!(rb.front(), Some(1));
136+
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3, 4]);
137+
}
138+
139+
#[test]
140+
fn test_wrap_around() {
141+
let mut rb = RingBuffer::<u32, 4>::default();
142+
for i in 1..=6 {
143+
rb.push_back(i);
144+
}
145+
assert_eq!(rb.len(), 4);
146+
assert_eq!(rb.last(), Some(6));
147+
assert_eq!(rb.front(), Some(3));
148+
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![3, 4, 5, 6]);
149+
}
150+
151+
#[test]
152+
fn test_many_wraps() {
153+
let mut rb = RingBuffer::<u32, 3>::default();
154+
for i in 1..=100 {
155+
rb.push_back(i);
156+
}
157+
assert_eq!(rb.len(), 3);
158+
assert_eq!(rb.last(), Some(100));
159+
assert_eq!(rb.front(), Some(98));
160+
assert_eq!(rb.iter().copied().collect::<Vec<_>>(), vec![98, 99, 100]);
161+
}
162+
163+
#[test]
164+
fn test_debug() {
165+
let mut rb = RingBuffer::<u32, 3>::default();
166+
rb.push_back(1);
167+
rb.push_back(2);
168+
assert_eq!(format!("{:?}", rb), "[1, 2]");
169+
}
170+
}

quickwit/quickwit-common/src/shared_consts.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";
6767
/// Key used in chitchat to broadcast the status of an ingester.
6868
pub const INGESTER_STATUS_KEY: &str = "ingester.status";
6969

70+
/// Prefix used in chitchat to broadcast per-source ingester capacity scores and open shard counts.
71+
pub const INGESTER_CAPACITY_SCORE_PREFIX: &str = "ingester.capacity_score:";
72+
7073
/// File name for the encoded list of fields in the split
7174
pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields";
7275

quickwit/quickwit-control-plane/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ quickwit-cluster = { workspace = true, features = ["testsuite"] }
4747
quickwit-common = { workspace = true, features = ["testsuite"] }
4848
quickwit-config = { workspace = true, features = ["testsuite"] }
4949
quickwit-indexing = { workspace = true }
50+
quickwit-ingest = { workspace = true, features = ["testsuite"] }
5051
quickwit-metastore = { workspace = true, features = ["testsuite"] }
5152
quickwit-proto = { workspace = true, features = ["testsuite"] }
5253

quickwit/quickwit-control-plane/src/control_plane.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2223,11 +2223,8 @@ mod tests {
22232223
assert!(&retain_shards_for_source.shard_ids.is_empty());
22242224
Ok(RetainShardsResponse {})
22252225
});
2226-
let client = IngesterServiceClient::from_mock(mock_ingester);
2227-
let ingester = IngesterPoolEntry {
2228-
client,
2229-
status: IngesterStatus::Ready,
2230-
};
2226+
let ingester =
2227+
IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
22312228
ingester_pool.insert("node1".into(), ingester);
22322229

22332230
let cluster_config = ClusterConfig::for_test();
@@ -2273,11 +2270,8 @@ mod tests {
22732270
);
22742271
Ok(RetainShardsResponse {})
22752272
});
2276-
let client = IngesterServiceClient::from_mock(mock_ingester);
2277-
let ingester = IngesterPoolEntry {
2278-
client,
2279-
status: IngesterStatus::Ready,
2280-
};
2273+
let ingester =
2274+
IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
22812275
ingester_pool.insert("node1".into(), ingester);
22822276

22832277
let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
@@ -2652,11 +2646,8 @@ mod tests {
26522646
};
26532647
Ok(response)
26542648
});
2655-
let client = IngesterServiceClient::from_mock(mock_ingester);
2656-
let ingester = IngesterPoolEntry {
2657-
client,
2658-
status: IngesterStatus::Ready,
2659-
};
2649+
let ingester =
2650+
IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
26602651
ingester_pool.insert(ingester_id, ingester);
26612652

26622653
let mut mock_metastore = MockMetastoreService::new();
@@ -2810,11 +2801,8 @@ mod tests {
28102801
};
28112802
Ok(response)
28122803
});
2813-
let client = IngesterServiceClient::from_mock(mock_ingester);
2814-
let ingester = IngesterPoolEntry {
2815-
client,
2816-
status: IngesterStatus::Ready,
2817-
};
2804+
let ingester =
2805+
IngesterPoolEntry::ready_with_client(IngesterServiceClient::from_mock(mock_ingester));
28182806
ingester_pool.insert(ingester_id, ingester);
28192807

28202808
let mut mock_metastore = MockMetastoreService::new();

0 commit comments

Comments
 (0)