Skip to content

Commit e0cd299

Browse files
committed
Enforce non-zero replica scale and workers
This commit changes `ReplicaAllocation` to enforce non-zero values for `scale` and `workers`. This makes Materialize refuse to start with replica sizes that specify zero for these values. Previously Materialize would start up but would panic as soon as one tried to create a replica with such an invalid size.
1 parent e3304a8 commit e0cd299

File tree

8 files changed

+70
-47
lines changed

8 files changed

+70
-47
lines changed

misc/python/materialize/mzcompose/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ def replica_size(
720720
"scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
721721
"scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
722722
# Intentionally not following the naming scheme
723-
"free": replica_size(0, 0, disabled=True),
723+
"free": replica_size(1, 1, disabled=True),
724724
}
725725

726726
for i in range(0, 6):

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2136,7 +2136,7 @@ impl CatalogState {
21362136

21372137
let row = Row::pack_slice(&[
21382138
size.as_str().into(),
2139-
u64::from(alloc.scale).into(),
2139+
u64::cast_from(alloc.scale).into(),
21402140
u64::cast_from(alloc.workers).into(),
21412141
cpu_limit.as_nanocpus().into(),
21422142
memory_bytes.into(),

src/catalog/src/config.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// by the Apache License, Version 2.0.
99

1010
use std::collections::{BTreeMap, BTreeSet};
11+
use std::num::NonZero;
1112

1213
use anyhow::bail;
1314
use bytesize::ByteSize;
@@ -122,7 +123,7 @@ impl ClusterReplicaSizeMap {
122123
bail!("No memory limit found in cluster definition for {name}");
123124
};
124125
replica.credits_per_hour = Numeric::from(
125-
(memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
126+
(memory_limit.0 * replica.scale.get() * u64::cast_from(replica.workers)).0,
126127
) / Numeric::from(1 * GIB);
127128
}
128129
}
@@ -170,7 +171,7 @@ impl ClusterReplicaSizeMap {
170171
// }
171172
let mut inner = (0..=5)
172173
.flat_map(|i| {
173-
let workers: u8 = 1 << i;
174+
let workers = 1 << i;
174175
[
175176
(format!("scale=1,workers={workers}"), None),
176177
(format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
@@ -185,8 +186,8 @@ impl ClusterReplicaSizeMap {
185186
memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
186187
cpu_limit: None,
187188
disk_limit: None,
188-
scale: 1,
189-
workers: workers.into(),
189+
scale: NonZero::new(1).expect("not zero"),
190+
workers: NonZero::new(workers).expect("not zero"),
190191
credits_per_hour: 1.into(),
191192
cpu_exclusive: false,
192193
is_cc: false,
@@ -207,8 +208,8 @@ impl ClusterReplicaSizeMap {
207208
memory_limit: None,
208209
cpu_limit: None,
209210
disk_limit: None,
210-
scale,
211-
workers: 1,
211+
scale: NonZero::new(scale).expect("not zero"),
212+
workers: NonZero::new(1).expect("not zero"),
212213
credits_per_hour: scale.into(),
213214
cpu_exclusive: false,
214215
is_cc: false,
@@ -224,8 +225,8 @@ impl ClusterReplicaSizeMap {
224225
memory_limit: None,
225226
cpu_limit: None,
226227
disk_limit: None,
227-
scale,
228-
workers: scale.into(),
228+
scale: NonZero::new(scale).expect("not zero"),
229+
workers: NonZero::new(scale.into()).expect("not zero"),
229230
credits_per_hour: scale.into(),
230231
cpu_exclusive: false,
231232
is_cc: false,
@@ -241,8 +242,8 @@ impl ClusterReplicaSizeMap {
241242
memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
242243
cpu_limit: None,
243244
disk_limit: None,
244-
scale: 1,
245-
workers: 8,
245+
scale: NonZero::new(1).expect("not zero"),
246+
workers: NonZero::new(8).expect("not zero"),
246247
credits_per_hour: 1.into(),
247248
cpu_exclusive: false,
248249
is_cc: false,
@@ -259,8 +260,8 @@ impl ClusterReplicaSizeMap {
259260
memory_limit: None,
260261
cpu_limit: None,
261262
disk_limit: None,
262-
scale: 2,
263-
workers: 4,
263+
scale: NonZero::new(2).expect("not zero"),
264+
workers: NonZero::new(4).expect("not zero"),
264265
credits_per_hour: 2.into(),
265266
cpu_exclusive: false,
266267
is_cc: false,
@@ -276,8 +277,8 @@ impl ClusterReplicaSizeMap {
276277
memory_limit: None,
277278
cpu_limit: None,
278279
disk_limit: None,
279-
scale: 0,
280-
workers: 0,
280+
scale: NonZero::new(1).expect("not zero"),
281+
workers: NonZero::new(1).expect("not zero"),
281282
credits_per_hour: 0.into(),
282283
cpu_exclusive: false,
283284
is_cc: true,

src/controller/src/clusters.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
1212
use std::collections::{BTreeMap, BTreeSet};
1313
use std::fmt;
14+
use std::num::NonZero;
1415
use std::str::FromStr;
1516
use std::sync::Arc;
1617
use std::sync::LazyLock;
@@ -34,9 +35,9 @@ use mz_orchestrator::{
3435
CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
3536
ServiceEvent, ServicePort,
3637
};
37-
use mz_ore::halt;
38-
use mz_ore::instrument;
38+
use mz_ore::cast::CastInto;
3939
use mz_ore::task::{self, AbortOnDropHandle};
40+
use mz_ore::{halt, instrument};
4041
use mz_repr::GlobalId;
4142
use mz_repr::adt::numeric::Numeric;
4243
use regex::Regex;
@@ -80,9 +81,9 @@ pub struct ReplicaAllocation {
8081
/// The disk limit for each process in the replica.
8182
pub disk_limit: Option<DiskLimit>,
8283
/// The number of processes in the replica.
83-
pub scale: u16,
84+
pub scale: NonZero<u16>,
8485
/// The number of worker threads in the replica.
85-
pub workers: usize,
86+
pub workers: NonZero<usize>,
8687
/// The number of credits per hour that the replica consumes.
8788
#[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
8889
pub credits_per_hour: Numeric,
@@ -113,6 +114,7 @@ fn default_true() -> bool {
113114
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
114115
fn test_replica_allocation_deserialization() {
115116
use bytesize::ByteSize;
117+
use mz_ore::{assert_err, assert_ok};
116118

117119
let data = r#"
118120
{
@@ -143,8 +145,8 @@ fn test_replica_allocation_deserialization() {
143145
cpu_exclusive: false,
144146
is_cc: true,
145147
swap_enabled: true,
146-
scale: 16,
147-
workers: 1,
148+
scale: NonZero::new(16).unwrap(),
149+
workers: NonZero::new(1).unwrap(),
148150
selectors: BTreeMap::from([
149151
("key1".to_string(), "value1".to_string()),
150152
("key2".to_string(), "value2".to_string())
@@ -157,8 +159,8 @@ fn test_replica_allocation_deserialization() {
157159
"cpu_limit": 0,
158160
"memory_limit": "0GiB",
159161
"disk_limit": "0MiB",
160-
"scale": 0,
161-
"workers": 0,
162+
"scale": 1,
163+
"workers": 1,
162164
"credits_per_hour": "0",
163165
"cpu_exclusive": true,
164166
"disabled": true
@@ -178,11 +180,19 @@ fn test_replica_allocation_deserialization() {
178180
cpu_exclusive: true,
179181
is_cc: true,
180182
swap_enabled: false,
181-
scale: 0,
182-
workers: 0,
183+
scale: NonZero::new(1).unwrap(),
184+
workers: NonZero::new(1).unwrap(),
183185
selectors: Default::default(),
184186
}
185187
);
188+
189+
// `scale` and `workers` must be non-zero.
190+
let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
191+
assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
192+
let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
193+
assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
194+
let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
195+
assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
186196
}
187197

188198
/// Configures the location of a cluster replica.
@@ -202,7 +212,7 @@ impl ReplicaLocation {
202212
computectl_addrs, ..
203213
}) => computectl_addrs.len(),
204214
ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
205-
allocation.scale.into()
215+
allocation.scale.cast_into()
206216
}
207217
}
208218
}
@@ -229,7 +239,7 @@ impl ReplicaLocation {
229239
pub fn workers(&self) -> Option<usize> {
230240
match self {
231241
ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
232-
Some(allocation.workers * self.num_processes())
242+
Some(allocation.workers.get() * self.num_processes())
233243
}
234244
ReplicaLocation::Unmanaged(_) => None,
235245
}
@@ -661,12 +671,12 @@ where
661671
init_container_image: self.init_container_image.clone(),
662672
args: Box::new(move |assigned| {
663673
let storage_timely_config = TimelyConfig {
664-
workers: location.allocation.workers,
674+
workers: location.allocation.workers.get(),
665675
addresses: assigned.peer_addresses("storage"),
666676
..storage_proto_timely_config
667677
};
668678
let compute_timely_config = TimelyConfig {
669-
workers: location.allocation.workers,
679+
workers: location.allocation.workers.get(),
670680
addresses: assigned.peer_addresses("compute"),
671681
..compute_proto_timely_config
672682
};

src/orchestrator-kubernetes/src/lib.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
use std::collections::BTreeMap;
1111
use std::future::Future;
12+
use std::num::NonZero;
1213
use std::sync::{Arc, Mutex};
1314
use std::time::{Duration, Instant};
1415
use std::{env, fmt};
@@ -49,6 +50,7 @@ use mz_orchestrator::{
4950
OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
5051
ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
5152
};
53+
use mz_ore::cast::CastInto;
5254
use mz_ore::retry::Retry;
5355
use mz_ore::task::AbortOnDropHandle;
5456
use serde::Deserialize;
@@ -216,7 +218,7 @@ impl Orchestrator for KubernetesOrchestrator {
216218

217219
#[derive(Clone, Copy)]
218220
struct ServiceInfo {
219-
scale: u16,
221+
scale: NonZero<u16>,
220222
}
221223

222224
struct NamespacedKubernetesOrchestrator {
@@ -267,7 +269,7 @@ enum WorkerCommand {
267269
#[derive(Debug, Clone)]
268270
struct ServiceDescription {
269271
name: String,
270-
scale: u16,
272+
scale: NonZero<u16>,
271273
service: K8sService,
272274
stateful_set: StatefulSet,
273275
pod_template_hash: String,
@@ -655,7 +657,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
655657
status: None,
656658
};
657659

658-
let hosts = (0..scale)
660+
let hosts = (0..scale.get())
659661
.map(|i| {
660662
format!(
661663
"{name}-{i}.{name}.{}.svc.cluster.local",
@@ -751,7 +753,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
751753
let topology_spread = if scheduling_config.topology_spread.enabled {
752754
let config = &scheduling_config.topology_spread;
753755

754-
if !config.ignore_non_singular_scale || scale <= 1 {
756+
if !config.ignore_non_singular_scale || scale.get() == 1 {
755757
let label_selector_requirements = (if config.ignore_non_singular_scale {
756758
let mut replicas_selector_ignoring_scale = replicas_selector.clone();
757759

@@ -1196,7 +1198,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
11961198
..Default::default()
11971199
},
11981200
service_name: Some(name.clone()),
1199-
replicas: Some(scale.into()),
1201+
replicas: Some(scale.cast_into()),
12001202
template: pod_template_spec,
12011203
update_strategy: Some(StatefulSetUpdateStrategy {
12021204
type_: Some("OnDelete".to_owned()),
@@ -1418,7 +1420,7 @@ impl OrchestratorWorker {
14181420
info: &ServiceInfo,
14191421
) -> Vec<ServiceProcessMetrics> {
14201422
if !self.collect_pod_metrics {
1421-
return (0..info.scale)
1423+
return (0..info.scale.get())
14221424
.map(|_| ServiceProcessMetrics::default())
14231425
.collect();
14241426
}
@@ -1559,8 +1561,9 @@ impl OrchestratorWorker {
15591561
Ok(usage)
15601562
}
15611563

1562-
let ret =
1563-
futures::future::join_all((0..info.scale).map(|i| get_metrics(self, name, i.into())));
1564+
let ret = futures::future::join_all(
1565+
(0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1566+
);
15641567

15651568
ret.await
15661569
}
@@ -1611,7 +1614,7 @@ impl OrchestratorWorker {
16111614
// Our pod recreation policy is simple: If a pod's template hash changed, delete it, and
16121615
// let the StatefulSet controller recreate it. Otherwise, patch the existing pod's
16131616
// annotations to line up with the ones in the spec.
1614-
for pod_id in 0..desc.scale {
1617+
for pod_id in 0..desc.scale.get() {
16151618
let pod_name = format!("{}-{pod_id}", desc.name);
16161619
let pod = match self.pod_api.get(&pod_name).await {
16171620
Ok(pod) => pod,

src/orchestrator-process/src/lib.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::fmt::Debug;
1414
use std::fs::Permissions;
1515
use std::future::Future;
1616
use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
17+
use std::num::NonZero;
1718
use std::os::unix::fs::PermissionsExt;
1819
use std::os::unix::process::ExitStatusExt;
1920
use std::path::{Path, PathBuf};
@@ -453,7 +454,7 @@ struct EnsureServiceConfig {
453454
/// An optional limit on the CPU that the service can use.
454455
pub cpu_limit: Option<CpuLimit>,
455456
/// The number of copies of this service to run.
456-
pub scale: u16,
457+
pub scale: NonZero<u16>,
457458
/// Arbitrary key–value pairs to attach to the service in the orchestrator
458459
/// backend.
459460
///
@@ -599,7 +600,7 @@ impl OrchestratorWorker {
599600
services.get(&id).map(|states| states.len())
600601
};
601602
match old_scale {
602-
Some(old) if old == usize::from(scale) => return Ok(()),
603+
Some(old) if old == usize::cast_from(scale) => return Ok(()),
603604
Some(_) => self.drop_service(&id).await?,
604605
None => (),
605606
}
@@ -622,7 +623,7 @@ impl OrchestratorWorker {
622623

623624
// Create the state for new processes.
624625
let mut process_states = vec![];
625-
for i in 0..scale.into() {
626+
for i in 0..usize::cast_from(scale) {
626627
let listen_addrs = &peer_addrs[i];
627628

628629
// Fill out placeholders in the command wrapper for this process.
@@ -1174,7 +1175,7 @@ impl From<ProcessStatus> for ServiceStatus {
11741175
}
11751176
}
11761177

1177-
fn socket_path(run_dir: &Path, port: &str, process: usize) -> String {
1178+
fn socket_path(run_dir: &Path, port: &str, process: u16) -> String {
11781179
let desired = run_dir
11791180
.join(format!("{port}-{process}"))
11801181
.to_string_lossy()
@@ -1199,13 +1200,13 @@ struct AddressedTcpListener {
11991200
#[derive(Debug, Clone)]
12001201
struct ProcessService {
12011202
run_dir: PathBuf,
1202-
scale: u16,
1203+
scale: NonZero<u16>,
12031204
}
12041205

12051206
impl Service for ProcessService {
12061207
fn addresses(&self, port: &str) -> Vec<String> {
1207-
(0..self.scale)
1208-
.map(|i| socket_path(&self.run_dir, port, i.into()))
1208+
(0..self.scale.get())
1209+
.map(|i| socket_path(&self.run_dir, port, i))
12091210
.collect()
12101211
}
12111212
}

src/orchestrator/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
use std::collections::BTreeMap;
1111
use std::fmt;
12+
use std::num::NonZero;
1213
use std::str::FromStr;
1314
use std::sync::Arc;
1415

@@ -211,7 +212,7 @@ pub struct ServiceConfig {
211212
/// An optional limit on the CPU that the service can use.
212213
pub cpu_limit: Option<CpuLimit>,
213214
/// The number of copies of this service to run.
214-
pub scale: u16,
215+
pub scale: NonZero<u16>,
215216
/// Arbitrary key–value pairs to attach to the service in the orchestrator
216217
/// backend.
217218
///

0 commit comments

Comments
 (0)