Skip to content

Commit 79757e9

Browse files
authored
Add AZ awareness to node state (quickwit-oss#6092)
* Initial stab at AZ awareness Signed-off-by: Nadav Gov-Ari <nadav.govari@datadoghq.com> * Comment Signed-off-by: Nadav Gov-Ari <nadav.govari@datadoghq.com> * Add tests, PR comments Signed-off-by: Nadav Gov-Ari <nadav.govari@datadoghq.com> --------- Signed-off-by: Nadav Gov-Ari <nadav.govari@datadoghq.com>
1 parent d8fc982 commit 79757e9

File tree

11 files changed

+60
-27
lines changed

11 files changed

+60
-27
lines changed

quickwit/quickwit-cli/src/tool.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
938938
grpc_advertise_addr: config.grpc_advertise_addr,
939939
indexing_cpu_capacity: CpuCapacity::zero(),
940940
indexing_tasks: Vec::new(),
941+
availability_zone: None,
941942
};
942943
let client_grpc_config = make_client_grpc_config(&config.grpc_config)?;
943944
let cluster = Cluster::join(

quickwit/quickwit-cluster/src/cluster.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ use tracing::{info, warn};
3939
use crate::change::{ClusterChange, ClusterChangeStreamFactory, compute_cluster_change_events};
4040
use crate::grpc_gossip::spawn_catchup_callback_task;
4141
use crate::member::{
42-
ClusterMember, ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY, NodeStateExt,
43-
PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY, READINESS_VALUE_READY,
44-
build_cluster_member,
42+
AVAILABILITY_ZONE_KEY, ClusterMember, ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY,
43+
NodeStateExt, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
44+
READINESS_VALUE_READY, build_cluster_member,
4545
};
4646
use crate::metrics::spawn_metrics_task;
4747
use crate::{ClusterChangeStream, ClusterNode};
@@ -150,25 +150,26 @@ impl Cluster {
150150
catchup_callback: Some(Box::new(catchup_callback)),
151151
extra_liveness_predicate: Some(Box::new(extra_liveness_predicate)),
152152
};
153-
let chitchat_handle = spawn_chitchat(
154-
chitchat_config,
155-
vec![
156-
(
157-
ENABLED_SERVICES_KEY.to_string(),
158-
self_node.enabled_services.iter().join(","),
159-
),
160-
(
161-
GRPC_ADVERTISE_ADDR_KEY.to_string(),
162-
self_node.grpc_advertise_addr.to_string(),
163-
),
164-
(
165-
READINESS_KEY.to_string(),
166-
READINESS_VALUE_NOT_READY.to_string(),
167-
),
168-
],
169-
transport,
170-
)
171-
.await?;
153+
let mut initial_key_values = vec![
154+
(
155+
ENABLED_SERVICES_KEY.to_string(),
156+
self_node.enabled_services.iter().join(","),
157+
),
158+
(
159+
GRPC_ADVERTISE_ADDR_KEY.to_string(),
160+
self_node.grpc_advertise_addr.to_string(),
161+
),
162+
(
163+
READINESS_KEY.to_string(),
164+
READINESS_VALUE_NOT_READY.to_string(),
165+
),
166+
];
167+
168+
if let Some(az) = &self_node.availability_zone {
169+
initial_key_values.push((AVAILABILITY_ZONE_KEY.to_string(), az.clone()));
170+
}
171+
let chitchat_handle =
172+
spawn_chitchat(chitchat_config, initial_key_values, transport).await?;
172173

173174
let chitchat = chitchat_handle.chitchat();
174175
let chitchat_guard = chitchat.lock().await;
@@ -706,6 +707,7 @@ pub async fn create_cluster_for_test_with_id(
706707
grpc_advertise_addr: grpc_addr_from_listen_addr_for_test(gossip_advertise_addr),
707708
indexing_tasks: Vec::new(),
708709
indexing_cpu_capacity: PIPELINE_FULL_CAPACITY,
710+
availability_zone: None,
709711
};
710712
let failure_detector_config = create_failure_detector_config_for_test();
711713
let cluster = Cluster::join(

quickwit/quickwit-cluster/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
143143
grpc_advertise_addr: node_config.grpc_advertise_addr,
144144
indexing_tasks,
145145
indexing_cpu_capacity,
146+
availability_zone: node_config.availability_zone.clone(),
146147
};
147148
let failure_detector_config = FailureDetectorConfig {
148149
dead_node_grace_period: Duration::from_secs(2 * 60 * 60), // 2 hours

quickwit/quickwit-cluster/src/member.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub(crate) const READINESS_KEY: &str = "readiness";
3636
pub(crate) const READINESS_VALUE_READY: &str = "READY";
3737
pub(crate) const READINESS_VALUE_NOT_READY: &str = "NOT_READY";
3838

39+
pub(crate) const AVAILABILITY_ZONE_KEY: &str = "availability_zone";
40+
3941
pub const INDEXING_CPU_CAPACITY_KEY: &str = "indexing_cpu_capacity";
4042

4143
pub(crate) trait NodeStateExt {
@@ -44,6 +46,8 @@ pub(crate) trait NodeStateExt {
4446
fn is_ready(&self) -> bool;
4547

4648
fn size_bytes(&self) -> usize;
49+
50+
fn availability_zone(&self) -> Option<String>;
4751
}
4852

4953
impl NodeStateExt for NodeState {
@@ -74,6 +78,10 @@ impl NodeStateExt for NodeState {
7478
.map(|(key, value)| key.len() + value.value.len() + SIZE_OF_VERSION + SIZE_OF_TOMBSTONE)
7579
.sum()
7680
}
81+
82+
fn availability_zone(&self) -> Option<String> {
83+
self.get(AVAILABILITY_ZONE_KEY).map(|az| az.to_string())
84+
}
7785
}
7886

7987
/// Cluster member.
@@ -101,6 +109,8 @@ pub struct ClusterMember {
101109
/// Indexing cpu capacity of the node expressed in milli cpu.
102110
pub indexing_cpu_capacity: CpuCapacity,
103111
pub is_ready: bool,
112+
/// Availability zone the node is running in, if enabled.
113+
pub availability_zone: Option<String>,
104114
}
105115

106116
impl ClusterMember {
@@ -149,6 +159,7 @@ pub(crate) fn build_cluster_member(
149159
.map(|enabled_services_str| {
150160
parse_enabled_services_str(enabled_services_str, &chitchat_id.node_id)
151161
})?;
162+
let availability_zone = node_state.availability_zone();
152163
let grpc_advertise_addr = node_state.grpc_advertise_addr()?;
153164
let indexing_tasks = parse_indexing_tasks(node_state);
154165
let indexing_cpu_capacity = parse_indexing_cpu_capacity(node_state);
@@ -161,6 +172,7 @@ pub(crate) fn build_cluster_member(
161172
grpc_advertise_addr,
162173
indexing_tasks,
163174
indexing_cpu_capacity,
175+
availability_zone,
164176
};
165177
Ok(member)
166178
}

quickwit/quickwit-config/resources/tests/node_config/quickwit.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"version": "0.7",
44
"cluster_id": "quickwit-cluster",
55
"node_id": "my-unique-node-id",
6+
"availability_zone": "az-1",
67
"enabled_services": [
78
"janitor",
89
"metastore"

quickwit/quickwit-config/resources/tests/node_config/quickwit.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ version = "0.7"
22

33
cluster_id = "quickwit-cluster"
44
node_id = "my-unique-node-id"
5+
availability_zone = "az-1"
56
enabled_services = [ "janitor", "metastore" ]
67
listen_address = "0.0.0.0"
78
advertise_address = "172.0.0.12"

quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ version: 0.8
22

33
cluster_id: quickwit-cluster
44
node_id: my-unique-node-id
5+
availability_zone: az-1
56
enabled_services:
67
- janitor
78
- metastore

quickwit/quickwit-config/src/config_value.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,17 @@ where
4141
}
4242
}
4343

44-
#[cfg(test)]
45-
pub(crate) fn for_test(value: T) -> Self {
44+
pub(crate) fn none() -> Self {
4645
Self {
47-
provided: Some(value),
46+
provided: None,
4847
default: None,
4948
}
5049
}
5150

5251
#[cfg(test)]
53-
pub(crate) fn none() -> Self {
52+
pub(crate) fn for_test(value: T) -> Self {
5453
Self {
55-
provided: None,
54+
provided: Some(value),
5655
default: None,
5756
}
5857
}

quickwit/quickwit-config/src/node_config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ impl Default for JaegerConfig {
566566
pub struct NodeConfig {
567567
pub cluster_id: String,
568568
pub node_id: NodeId,
569+
pub availability_zone: Option<String>,
569570
pub enabled_services: HashSet<QuickwitService>,
570571
pub gossip_listen_addr: SocketAddr,
571572
pub grpc_listen_addr: SocketAddr,

quickwit/quickwit-config/src/node_config/serialize.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ fn default_node_id() -> ConfigValue<String, QW_NODE_ID> {
6666
ConfigValue::with_default(node_id)
6767
}
6868

69+
fn default_availability_zone() -> ConfigValue<String, QW_AVAILABILITY_ZONE> {
70+
ConfigValue::none()
71+
}
72+
6973
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
7074
struct List(Vec<String>);
7175

@@ -169,6 +173,8 @@ struct NodeConfigBuilder {
169173
cluster_id: ConfigValue<String, QW_CLUSTER_ID>,
170174
#[serde(default = "default_node_id")]
171175
node_id: ConfigValue<String, QW_NODE_ID>,
176+
#[serde(default = "default_availability_zone")]
177+
availability_zone: ConfigValue<String, QW_AVAILABILITY_ZONE>,
172178
#[serde(default = "default_enabled_services")]
173179
enabled_services: ConfigValue<List, QW_ENABLED_SERVICES>,
174180
#[serde(default = "default_listen_address")]
@@ -218,6 +224,7 @@ impl NodeConfigBuilder {
218224
env_vars: &HashMap<String, String>,
219225
) -> anyhow::Result<NodeConfig> {
220226
let node_id = self.node_id.resolve(env_vars).map(NodeId::new)?;
227+
let availability_zone = self.availability_zone.resolve_optional(env_vars)?;
221228

222229
let enabled_services = self
223230
.enabled_services
@@ -305,6 +312,7 @@ impl NodeConfigBuilder {
305312
let node_config = NodeConfig {
306313
cluster_id: self.cluster_id.resolve(env_vars)?,
307314
node_id,
315+
availability_zone,
308316
enabled_services,
309317
gossip_listen_addr,
310318
grpc_listen_addr,
@@ -401,6 +409,7 @@ impl Default for NodeConfigBuilder {
401409
Self {
402410
cluster_id: default_cluster_id(),
403411
node_id: default_node_id(),
412+
availability_zone: ConfigValue::none(),
404413
enabled_services: default_enabled_services(),
405414
listen_address: default_listen_address(),
406415
rest_listen_port: None,
@@ -469,6 +478,7 @@ pub fn node_config_for_tests_from_ports(
469478
) -> NodeConfig {
470479
let node_id = NodeId::new(default_node_id().unwrap());
471480
let enabled_services = QuickwitService::supported_services();
481+
let availability_zone = Some(String::from("az-1"));
472482
let listen_address = Host::default();
473483
let rest_listen_addr = listen_address
474484
.with_port(rest_listen_port)
@@ -499,6 +509,7 @@ pub fn node_config_for_tests_from_ports(
499509
NodeConfig {
500510
cluster_id: default_cluster_id().unwrap(),
501511
node_id,
512+
availability_zone,
502513
enabled_services,
503514
gossip_advertise_addr: gossip_listen_addr,
504515
grpc_advertise_addr: grpc_listen_addr,
@@ -553,6 +564,7 @@ mod tests {
553564
assert!(config.is_service_enabled(QuickwitService::Janitor));
554565
assert!(config.is_service_enabled(QuickwitService::Metastore));
555566

567+
assert_eq!(config.availability_zone.unwrap(), "az-1");
556568
assert_eq!(
557569
config.rest_config.listen_addr,
558570
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1111)
@@ -738,6 +750,7 @@ mod tests {
738750
.unwrap();
739751
assert_eq!(config.cluster_id, DEFAULT_CLUSTER_ID);
740752
assert_eq!(config.node_id, get_short_hostname().unwrap());
753+
assert_eq!(config.availability_zone, None);
741754
assert_eq!(
742755
config.enabled_services,
743756
QuickwitService::supported_services()

0 commit comments

Comments
 (0)