Skip to content

Commit 308563a

Browse files
committed
compute,storage: annotate wallclock lag histogram with workload_class
This commit adds a "workload_class" label to measurements in `mz_wallclock_global_lag_histogram`. The value of the label is the workload class of the cluster maintaining the respective collection.
1 parent 68bc3fe commit 308563a

File tree

5 files changed

+100
-16
lines changed

5 files changed

+100
-16
lines changed

src/compute-client/src/controller/instance.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use mz_ore::{soft_assert_or_log, soft_panic_or_log};
3939
use mz_repr::adt::interval::Interval;
4040
use mz_repr::adt::timestamp::CheckedTimestamp;
4141
use mz_repr::refresh_schedule::RefreshSchedule;
42-
use mz_repr::{Datum, DatumMap, Diff, GlobalId, Row};
42+
use mz_repr::{Datum, Diff, GlobalId, Row};
4343
use mz_storage_client::controller::{IntrospectionType, WallclockLagHistogramPeriod};
4444
use mz_storage_types::read_holds::{self, ReadHold};
4545
use mz_storage_types::read_policy::ReadPolicy;
@@ -228,6 +228,10 @@ pub(super) struct Instance<T: ComputeControllerTimestamp> {
228228
/// controlled by it are allowed to affect changes to external systems
229229
/// (largely persist).
230230
read_only: bool,
231+
/// The workload class of this instance.
232+
///
233+
/// This is currently only used to annotate metrics.
234+
workload_class: Option<String>,
231235
/// The replicas of this compute instance.
232236
replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
233237
/// Currently installed compute collections.
@@ -608,8 +612,13 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
608612
}
609613

610614
let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
615+
let histogram_labels = match &self.workload_class {
616+
Some(wc) => [("workload_class", wc.clone())].into(),
617+
None => BTreeMap::new(),
618+
};
611619

612620
let mut histogram_updates = Vec::new();
621+
let mut row_buf = Row::default();
613622
for (collection_id, collection) in &mut self.collections {
614623
if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(&self.dyncfg) {
615624
continue;
@@ -619,19 +628,22 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
619628
let lag = collection.shared.lock_write_frontier(|f| frontier_lag(f));
620629
let bucket = lag.as_secs().next_power_of_two();
621630

622-
let key = (histogram_period, bucket);
631+
let key = (histogram_period, bucket, histogram_labels.clone());
623632
*stash.entry(key).or_default() += 1;
624633

625634
if refresh_histogram {
626-
for ((period, lag), count) in std::mem::take(stash) {
627-
let row = Row::pack_slice(&[
635+
for ((period, lag, labels), count) in std::mem::take(stash) {
636+
let mut packer = row_buf.packer();
637+
packer.extend([
628638
Datum::TimestampTz(period.start),
629639
Datum::TimestampTz(period.end),
630640
Datum::String(&collection_id.to_string()),
631641
Datum::UInt64(lag),
632-
Datum::Map(DatumMap::empty()),
633642
]);
634-
histogram_updates.push((row, count));
643+
let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
644+
packer.push_dict(labels);
645+
646+
histogram_updates.push((row_buf.clone(), count));
635647
}
636648
}
637649
}
@@ -846,6 +858,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
846858
storage_collections: _,
847859
initialized,
848860
read_only,
861+
workload_class,
849862
replicas,
850863
collections,
851864
log_sources: _,
@@ -905,6 +918,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
905918
let map = serde_json::Map::from_iter([
906919
field("initialized", initialized)?,
907920
field("read_only", read_only)?,
921+
field("workload_class", workload_class)?,
908922
field("replicas", replicas)?,
909923
field("collections", collections)?,
910924
field("peeks", peeks)?,
@@ -968,6 +982,7 @@ where
968982
storage_collections: storage,
969983
initialized: false,
970984
read_only: true,
985+
workload_class: None,
971986
replicas: Default::default(),
972987
collections,
973988
log_sources,
@@ -1022,6 +1037,10 @@ where
10221037
/// Update instance configuration.
10231038
#[mz_ore::instrument(level = "debug")]
10241039
pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1040+
if let Some(workload_class) = &config_params.workload_class {
1041+
self.workload_class = workload_class.clone();
1042+
}
1043+
10251044
self.send(ComputeCommand::UpdateConfiguration(config_params));
10261045
}
10271046

@@ -2211,8 +2230,19 @@ struct CollectionState<T: ComputeControllerTimestamp> {
22112230
/// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
22122231
/// introspection update.
22132232
///
2233+
/// Keys are `(period, lag, labels)` triples, values are counts.
2234+
///
22142235
/// If this is `None`, wallclock lag is not tracked for this collection.
2215-
wallclock_lag_histogram_stash: Option<BTreeMap<(WallclockLagHistogramPeriod, u64), i64>>,
2236+
wallclock_lag_histogram_stash: Option<
2237+
BTreeMap<
2238+
(
2239+
WallclockLagHistogramPeriod,
2240+
u64,
2241+
BTreeMap<&'static str, String>,
2242+
),
2243+
i64,
2244+
>,
2245+
>,
22162246
}
22172247

22182248
impl<T: ComputeControllerTimestamp> CollectionState<T> {

src/controller/src/clusters.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ where
364364
id: ClusterId,
365365
workload_class: Option<String>,
366366
) -> Result<(), anyhow::Error> {
367+
self.storage
368+
.update_instance_workload_class(id, workload_class.clone());
367369
self.compute
368370
.update_instance_workload_class(id, workload_class)?;
369371
Ok(())

src/storage-client/src/controller.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,13 @@ pub trait StorageController: Debug {
383383
/// Panics if a storage instance with the given ID does not exist.
384384
fn drop_instance(&mut self, id: StorageInstanceId);
385385

386+
/// Updates a storage instance's workload class.
387+
fn update_instance_workload_class(
388+
&mut self,
389+
id: StorageInstanceId,
390+
workload_class: Option<String>,
391+
);
392+
386393
/// Connects the storage instance to the specified replica.
387394
///
388395
/// If the storage instance is already attached to a replica, communication

src/storage-controller/src/instance.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ use crate::history::CommandHistory;
5555
/// lead to panics.
5656
#[derive(Debug)]
5757
pub(crate) struct Instance<T> {
58+
/// The workload class of this instance.
59+
///
60+
/// This is currently only used to annotate metrics.
61+
pub workload_class: Option<String>,
5862
/// The replicas connected to this storage instance.
5963
replicas: BTreeMap<ReplicaId, Replica<T>>,
6064
/// The ingestions currently running on this instance.
@@ -114,6 +118,7 @@ where
114118
let epoch = ClusterStartupEpoch::new(envd_epoch, 0);
115119

116120
let mut instance = Self {
121+
workload_class: None,
117122
replicas: Default::default(),
118123
active_ingestions: Default::default(),
119124
ingestion_exports: Default::default(),

src/storage-controller/src/lib.rs

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ use mz_persist_types::Codec64;
4848
use mz_proto::RustType;
4949
use mz_repr::adt::interval::Interval;
5050
use mz_repr::adt::timestamp::CheckedTimestamp;
51-
use mz_repr::{
52-
Datum, DatumMap, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation,
53-
};
51+
use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
5452
use mz_storage_client::client::{
5553
ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunOneshotIngestion,
5654
RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse, TableData,
@@ -536,6 +534,19 @@ where
536534
assert!(instance.is_some(), "storage instance {id} does not exist");
537535
}
538536

537+
fn update_instance_workload_class(
538+
&mut self,
539+
id: StorageInstanceId,
540+
workload_class: Option<String>,
541+
) {
542+
let instance = self
543+
.instances
544+
.get_mut(&id)
545+
.unwrap_or_else(|| panic!("instance {id} does not exist"));
546+
547+
instance.workload_class = workload_class;
548+
}
549+
539550
fn connect_replica(
540551
&mut self,
541552
instance_id: StorageInstanceId,
@@ -3478,6 +3489,7 @@ where
34783489

34793490
let mut history_updates = Vec::new();
34803491
let mut histogram_updates = Vec::new();
3492+
let mut row_buf = Row::default();
34813493
for frontiers in self.storage_collections.active_collection_frontiers() {
34823494
let id = frontiers.id;
34833495
let Some(collection) = self.collections.get_mut(&id) else {
@@ -3507,19 +3519,36 @@ where
35073519

35083520
if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
35093521
let bucket = lag.as_secs().next_power_of_two();
3510-
let key = (histogram_period, bucket);
3522+
3523+
let instance_id = match &collection.extra_state {
3524+
CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3525+
CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3526+
CollectionStateExtra::None => None,
3527+
};
3528+
let workload_class = instance_id
3529+
.and_then(|id| self.instances.get(&id))
3530+
.and_then(|i| i.workload_class.clone());
3531+
let labels = match workload_class {
3532+
Some(wc) => [("workload_class", wc.clone())].into(),
3533+
None => BTreeMap::new(),
3534+
};
3535+
3536+
let key = (histogram_period, bucket, labels);
35113537
*stash.entry(key).or_default() += 1;
35123538

35133539
if refresh_histogram {
3514-
for ((period, lag), count) in std::mem::take(stash) {
3515-
let row = Row::pack_slice(&[
3540+
for ((period, lag, labels), count) in std::mem::take(stash) {
3541+
let mut packer = row_buf.packer();
3542+
packer.extend([
35163543
Datum::TimestampTz(period.start),
35173544
Datum::TimestampTz(period.end),
35183545
Datum::String(&id.to_string()),
35193546
Datum::UInt64(lag),
3520-
Datum::Map(DatumMap::empty()),
35213547
]);
3522-
histogram_updates.push((row, count));
3548+
let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3549+
packer.push_dict(labels);
3550+
3551+
histogram_updates.push((row_buf.clone(), count));
35233552
}
35243553
}
35253554
}
@@ -3666,8 +3695,19 @@ struct CollectionState<T: TimelyTimestamp> {
36663695
/// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
36673696
/// introspection update.
36683697
///
3698+
/// Keys are `(period, lag, labels)` triples, values are counts.
3699+
///
36693700
/// If this is `None`, wallclock lag is not tracked for this collection.
3670-
wallclock_lag_histogram_stash: Option<BTreeMap<(WallclockLagHistogramPeriod, u64), i64>>,
3701+
wallclock_lag_histogram_stash: Option<
3702+
BTreeMap<
3703+
(
3704+
WallclockLagHistogramPeriod,
3705+
u64,
3706+
BTreeMap<&'static str, String>,
3707+
),
3708+
i64,
3709+
>,
3710+
>,
36713711
/// Frontier wallclock lag metrics tracked for this collection.
36723712
wallclock_lag_metrics: WallclockLagMetrics,
36733713
}

0 commit comments

Comments
 (0)