Skip to content

Commit 6f2b59c

Browse files
committed
compute,storage: annotate wallclock lag histogram with workload_class
This commit adds a "workload_class" label to measurements in `mz_wallclock_global_lag_histogram`. The value of the label is the workload class of the cluster maintaining the respective collection.
1 parent c3da5fe commit 6f2b59c

File tree

5 files changed

+100
-16
lines changed

5 files changed

+100
-16
lines changed

src/compute-client/src/controller/instance.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use mz_ore::{soft_assert_or_log, soft_panic_or_log};
3939
use mz_repr::adt::interval::Interval;
4040
use mz_repr::adt::timestamp::CheckedTimestamp;
4141
use mz_repr::refresh_schedule::RefreshSchedule;
42-
use mz_repr::{Datum, DatumMap, Diff, GlobalId, Row};
42+
use mz_repr::{Datum, Diff, GlobalId, Row};
4343
use mz_storage_client::controller::{IntrospectionType, WallclockLagHistogramPeriod};
4444
use mz_storage_types::read_holds::{self, ReadHold};
4545
use mz_storage_types::read_policy::ReadPolicy;
@@ -228,6 +228,10 @@ pub(super) struct Instance<T: ComputeControllerTimestamp> {
228228
/// controlled by it are allowed to affect changes to external systems
229229
/// (largely persist).
230230
read_only: bool,
231+
/// The workload class of this instance.
232+
///
233+
/// This is currently only used to annotate metrics.
234+
workload_class: Option<String>,
231235
/// The replicas of this compute instance.
232236
replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
233237
/// Currently installed compute collections.
@@ -608,26 +612,34 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
608612
}
609613

610614
let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
615+
let histogram_labels = match &self.workload_class {
616+
Some(wc) => [("workload_class", wc.clone())].into(),
617+
None => BTreeMap::new(),
618+
};
611619

612620
let mut histogram_updates = Vec::new();
621+
let mut row_buf = Row::default();
613622
for (collection_id, collection) in &mut self.collections {
614623
if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
615624
let lag = collection.shared.lock_write_frontier(|f| frontier_lag(f));
616625
let bucket = lag.as_secs().next_power_of_two();
617626

618-
let key = (histogram_period, bucket);
627+
let key = (histogram_period, bucket, histogram_labels.clone());
619628
*stash.entry(key).or_default() += 1;
620629

621630
if refresh_histogram {
622-
for ((period, lag), count) in std::mem::take(stash) {
623-
let row = Row::pack_slice(&[
631+
for ((period, lag, labels), count) in std::mem::take(stash) {
632+
let mut packer = row_buf.packer();
633+
packer.extend([
624634
Datum::TimestampTz(period.start),
625635
Datum::TimestampTz(period.end),
626636
Datum::String(&collection_id.to_string()),
627637
Datum::UInt64(lag),
628-
Datum::Map(DatumMap::empty()),
629638
]);
630-
histogram_updates.push((row, count));
639+
let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
640+
packer.push_dict(labels);
641+
642+
histogram_updates.push((row_buf.clone(), count));
631643
}
632644
}
633645
}
@@ -842,6 +854,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
842854
storage_collections: _,
843855
initialized,
844856
read_only,
857+
workload_class,
845858
replicas,
846859
collections,
847860
log_sources: _,
@@ -901,6 +914,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
901914
let map = serde_json::Map::from_iter([
902915
field("initialized", initialized)?,
903916
field("read_only", read_only)?,
917+
field("workload_class", workload_class)?,
904918
field("replicas", replicas)?,
905919
field("collections", collections)?,
906920
field("peeks", peeks)?,
@@ -964,6 +978,7 @@ where
964978
storage_collections: storage,
965979
initialized: false,
966980
read_only: true,
981+
workload_class: None,
967982
replicas: Default::default(),
968983
collections,
969984
log_sources,
@@ -1018,6 +1033,10 @@ where
10181033
/// Update instance configuration.
10191034
#[mz_ore::instrument(level = "debug")]
10201035
pub fn update_configuration(&mut self, config_params: ComputeParameters) {
1036+
if let Some(workload_class) = &config_params.workload_class {
1037+
self.workload_class = workload_class.clone();
1038+
}
1039+
10211040
self.send(ComputeCommand::UpdateConfiguration(config_params));
10221041
}
10231042

@@ -2207,8 +2226,19 @@ struct CollectionState<T: ComputeControllerTimestamp> {
22072226
/// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
22082227
/// introspection update.
22092228
///
2229+
/// Keys are `(period, lag, labels)` triples, values are counts.
2230+
///
22102231
/// If this is `None`, wallclock lag is not tracked for this collection.
2211-
wallclock_lag_histogram_stash: Option<BTreeMap<(WallclockLagHistogramPeriod, u64), i64>>,
2232+
wallclock_lag_histogram_stash: Option<
2233+
BTreeMap<
2234+
(
2235+
WallclockLagHistogramPeriod,
2236+
u64,
2237+
BTreeMap<&'static str, String>,
2238+
),
2239+
i64,
2240+
>,
2241+
>,
22122242
}
22132243

22142244
impl<T: ComputeControllerTimestamp> CollectionState<T> {

src/controller/src/clusters.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ where
364364
id: ClusterId,
365365
workload_class: Option<String>,
366366
) -> Result<(), anyhow::Error> {
367+
self.storage
368+
.update_instance_workload_class(id, workload_class.clone());
367369
self.compute
368370
.update_instance_workload_class(id, workload_class)?;
369371
Ok(())

src/storage-client/src/controller.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,13 @@ pub trait StorageController: Debug {
383383
/// Panics if a storage instance with the given ID does not exist.
384384
fn drop_instance(&mut self, id: StorageInstanceId);
385385

386+
/// Updates a storage instance's workload class.
387+
fn update_instance_workload_class(
388+
&mut self,
389+
id: StorageInstanceId,
390+
workload_class: Option<String>,
391+
);
392+
386393
/// Connects the storage instance to the specified replica.
387394
///
388395
/// If the storage instance is already attached to a replica, communication

src/storage-controller/src/instance.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ use crate::history::CommandHistory;
5555
/// lead to panics.
5656
#[derive(Debug)]
5757
pub(crate) struct Instance<T> {
58+
/// The workload class of this instance.
59+
///
60+
/// This is currently only used to annotate metrics.
61+
pub workload_class: Option<String>,
5862
/// The replicas connected to this storage instance.
5963
replicas: BTreeMap<ReplicaId, Replica<T>>,
6064
/// The ingestions currently running on this instance.
@@ -114,6 +118,7 @@ where
114118
let epoch = ClusterStartupEpoch::new(envd_epoch, 0);
115119

116120
let mut instance = Self {
121+
workload_class: None,
117122
replicas: Default::default(),
118123
active_ingestions: Default::default(),
119124
ingestion_exports: Default::default(),

src/storage-controller/src/lib.rs

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ use mz_persist_types::Codec64;
4949
use mz_proto::RustType;
5050
use mz_repr::adt::interval::Interval;
5151
use mz_repr::adt::timestamp::CheckedTimestamp;
52-
use mz_repr::{
53-
Datum, DatumMap, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation,
54-
};
52+
use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, TimestampManipulation};
5553
use mz_storage_client::client::{
5654
ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunOneshotIngestion,
5755
RunSinkCommand, Status, StatusUpdate, StorageCommand, StorageResponse, TableData,
@@ -537,6 +535,19 @@ where
537535
assert!(instance.is_some(), "storage instance {id} does not exist");
538536
}
539537

538+
fn update_instance_workload_class(
539+
&mut self,
540+
id: StorageInstanceId,
541+
workload_class: Option<String>,
542+
) {
543+
let instance = self
544+
.instances
545+
.get_mut(&id)
546+
.unwrap_or_else(|| panic!("instance {id} does not exist"));
547+
548+
instance.workload_class = workload_class;
549+
}
550+
540551
fn connect_replica(
541552
&mut self,
542553
instance_id: StorageInstanceId,
@@ -3479,6 +3490,7 @@ where
34793490

34803491
let mut history_updates = Vec::new();
34813492
let mut histogram_updates = Vec::new();
3493+
let mut row_buf = Row::default();
34823494
for frontiers in self.storage_collections.active_collection_frontiers() {
34833495
let id = frontiers.id;
34843496
let Some(collection) = self.collections.get_mut(&id) else {
@@ -3504,19 +3516,36 @@ where
35043516

35053517
if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
35063518
let bucket = lag.as_secs().next_power_of_two();
3507-
let key = (histogram_period, bucket);
3519+
3520+
let instance_id = match &collection.extra_state {
3521+
CollectionStateExtra::Ingestion(i) => Some(i.instance_id),
3522+
CollectionStateExtra::Export(e) => Some(e.cluster_id()),
3523+
CollectionStateExtra::None => None,
3524+
};
3525+
let workload_class = instance_id
3526+
.and_then(|id| self.instances.get(&id))
3527+
.and_then(|i| i.workload_class.clone());
3528+
let labels = match workload_class {
3529+
Some(wc) => [("workload_class", wc.clone())].into(),
3530+
None => BTreeMap::new(),
3531+
};
3532+
3533+
let key = (histogram_period, bucket, labels);
35083534
*stash.entry(key).or_default() += 1;
35093535

35103536
if refresh_histogram {
3511-
for ((period, lag), count) in std::mem::take(stash) {
3512-
let row = Row::pack_slice(&[
3537+
for ((period, lag, labels), count) in std::mem::take(stash) {
3538+
let mut packer = row_buf.packer();
3539+
packer.extend([
35133540
Datum::TimestampTz(period.start),
35143541
Datum::TimestampTz(period.end),
35153542
Datum::String(&id.to_string()),
35163543
Datum::UInt64(lag),
3517-
Datum::Map(DatumMap::empty()),
35183544
]);
3519-
histogram_updates.push((row, count));
3545+
let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
3546+
packer.push_dict(labels);
3547+
3548+
histogram_updates.push((row_buf.clone(), count));
35203549
}
35213550
}
35223551
}
@@ -3663,8 +3692,19 @@ struct CollectionState<T: TimelyTimestamp> {
36633692
/// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
36643693
/// introspection update.
36653694
///
3695+
/// Keys are `(period, lag, labels)` triples, values are counts.
3696+
///
36663697
/// If this is `None`, wallclock lag is not tracked for this collection.
3667-
wallclock_lag_histogram_stash: Option<BTreeMap<(WallclockLagHistogramPeriod, u64), i64>>,
3698+
wallclock_lag_histogram_stash: Option<
3699+
BTreeMap<
3700+
(
3701+
WallclockLagHistogramPeriod,
3702+
u64,
3703+
BTreeMap<&'static str, String>,
3704+
),
3705+
i64,
3706+
>,
3707+
>,
36683708
/// Frontier wallclock lag metrics tracked for this collection.
36693709
wallclock_lag_metrics: WallclockLagMetrics,
36703710
}

0 commit comments

Comments
 (0)