Skip to content

Commit e53caaa

Browse files
authored
Merge pull request #34340 from aljoscha/push-ozkukkxmlxwo
adapter: add histogram metrics for more coordinator methods
2 parents c230726 + 6ef7bdf commit e53caaa

File tree

6 files changed

+85
-9
lines changed

6 files changed

+85
-9
lines changed

src/adapter/src/coord/catalog_implications.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
2929
use std::collections::{BTreeMap, BTreeSet};
3030
use std::sync::Arc;
31-
use std::time::Duration;
31+
use std::time::{Duration, Instant};
3232

3333
use fail::fail_point;
3434
use itertools::Itertools;
@@ -82,6 +82,8 @@ impl Coordinator {
8282
ctx: Option<&mut ExecuteContext>,
8383
catalog_updates: Vec<ParsedStateUpdate>,
8484
) -> Result<(), AdapterError> {
85+
let start = Instant::now();
86+
8587
let mut catalog_implications: BTreeMap<CatalogItemId, CatalogImplication> = BTreeMap::new();
8688
let mut cluster_commands: BTreeMap<ClusterId, CatalogImplication> = BTreeMap::new();
8789
let mut cluster_replica_commands: BTreeMap<(ClusterId, ReplicaId), CatalogImplication> =
@@ -144,6 +146,10 @@ impl Coordinator {
144146
)
145147
.await?;
146148

149+
self.metrics
150+
.apply_catalog_implications_seconds
151+
.observe(start.elapsed().as_secs_f64());
152+
147153
Ok(())
148154
}
149155

src/adapter/src/coord/ddl.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
use std::collections::{BTreeMap, BTreeSet};
1414
use std::pin::Pin;
1515
use std::sync::Arc;
16-
use std::time::Duration;
16+
use std::time::{Duration, Instant};
1717

1818
use fail::fail_point;
1919
use maplit::{btreemap, btreeset};
@@ -67,8 +67,15 @@ impl Coordinator {
6767
session: Option<&Session>,
6868
ops: Vec<catalog::Op>,
6969
) -> Result<(), AdapterError> {
70-
self.catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
71-
.await
70+
let start = Instant::now();
71+
let result = self
72+
.catalog_transact_with_context(session.map(|session| session.conn_id()), None, ops)
73+
.await;
74+
self.metrics
75+
.catalog_transact_seconds
76+
.with_label_values(&["catalog_transact"])
77+
.observe(start.elapsed().as_secs_f64());
78+
result
7279
}
7380

7481
/// Same as [`Self::catalog_transact_with_context`] but takes a [`Session`]
@@ -93,6 +100,8 @@ impl Coordinator {
93100
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
94101
+ 'static,
95102
{
103+
let start = Instant::now();
104+
96105
let (table_updates, catalog_updates) = self
97106
.catalog_transact_inner(ctx.as_ref().map(|ctx| ctx.session().conn_id()), ops)
98107
.await?;
@@ -129,6 +138,11 @@ impl Coordinator {
129138
)
130139
.await;
131140

141+
self.metrics
142+
.catalog_transact_seconds
143+
.with_label_values(&["catalog_transact_with_side_effects"])
144+
.observe(start.elapsed().as_secs_f64());
145+
132146
Ok(())
133147
}
134148

@@ -146,6 +160,8 @@ impl Coordinator {
146160
ctx: Option<&mut ExecuteContext>,
147161
ops: Vec<catalog::Op>,
148162
) -> Result<(), AdapterError> {
163+
let start = Instant::now();
164+
149165
let conn_id = conn_id.or_else(|| ctx.as_ref().map(|ctx| ctx.session().conn_id()));
150166

151167
let (table_updates, catalog_updates) = self.catalog_transact_inner(conn_id, ops).await?;
@@ -176,6 +192,11 @@ impl Coordinator {
176192
"coordinator inconsistency detected"
177193
);
178194

195+
self.metrics
196+
.catalog_transact_seconds
197+
.with_label_values(&["catalog_transact_with_context"])
198+
.observe(start.elapsed().as_secs_f64());
199+
179200
Ok(())
180201
}
181202

@@ -197,6 +218,8 @@ impl Coordinator {
197218
+ Sync
198219
+ 'static,
199220
{
221+
let start = Instant::now();
222+
200223
let Some(Transaction {
201224
ops:
202225
TransactionOps::DDL {
@@ -208,13 +231,22 @@ impl Coordinator {
208231
..
209232
}) = ctx.session().transaction().inner()
210233
else {
211-
return self
234+
let result = self
212235
.catalog_transact_with_side_effects(Some(ctx), ops, side_effect)
213236
.await;
237+
self.metrics
238+
.catalog_transact_seconds
239+
.with_label_values(&["catalog_transact_with_ddl_transaction"])
240+
.observe(start.elapsed().as_secs_f64());
241+
return result;
214242
};
215243

216244
// Make sure our Catalog hasn't changed since openning the transaction.
217245
if self.catalog().transient_revision() != *txn_revision {
246+
self.metrics
247+
.catalog_transact_seconds
248+
.with_label_values(&["catalog_transact_with_ddl_transaction"])
249+
.observe(start.elapsed().as_secs_f64());
218250
return Err(AdapterError::DDLTransactionRace);
219251
}
220252

@@ -227,7 +259,7 @@ impl Coordinator {
227259
// Run our Catalog transaction, but abort before committing.
228260
let result = self.catalog_transact(Some(ctx.session()), all_ops).await;
229261

230-
match result {
262+
let result = match result {
231263
// We purposefully fail with this error to prevent committing the transaction.
232264
Err(AdapterError::TransactionDryRun { new_ops, new_state }) => {
233265
// Sets these ops to our transaction, bailing if the Catalog has changed since we
@@ -244,7 +276,14 @@ impl Coordinator {
244276
}
245277
Ok(_) => unreachable!("unexpected success!"),
246278
Err(e) => Err(e),
247-
}
279+
};
280+
281+
self.metrics
282+
.catalog_transact_seconds
283+
.with_label_values(&["catalog_transact_with_ddl_transaction"])
284+
.observe(start.elapsed().as_secs_f64());
285+
286+
result
248287
}
249288

250289
/// Perform a catalog transaction. [`Coordinator::ship_dataflow`] must be

src/adapter/src/metrics.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct Metrics {
5050
pub pgwire_ensure_transaction_seconds: HistogramVec,
5151
pub catalog_snapshot_seconds: HistogramVec,
5252
pub pgwire_recv_scheduling_delay_ms: HistogramVec,
53+
pub catalog_transact_seconds: HistogramVec,
54+
pub apply_catalog_implications_seconds: Histogram,
5355
}
5456

5557
impl Metrics {
@@ -221,6 +223,17 @@ impl Metrics {
221223
help: "The time between a pgwire connection's receiver task being woken up by incoming data and getting polled.",
222224
var_labels: ["message_type"],
223225
buckets: histogram_milliseconds_buckets(0.128, 512000.),
226+
)),
227+
catalog_transact_seconds: registry.register(metric!(
228+
name: "mz_catalog_transact_seconds",
229+
help: "The time it takes to run various catalog transact methods.",
230+
var_labels: ["method"],
231+
buckets: histogram_seconds_buckets(0.001, 32.0),
232+
)),
233+
apply_catalog_implications_seconds: registry.register(metric!(
234+
name: "mz_apply_catalog_implications_seconds",
235+
help: "The time it takes to apply catalog implications.",
236+
buckets: histogram_seconds_buckets(0.001, 32.0),
224237
))
225238
}
226239
}

src/catalog/src/durable.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
use std::fmt::Debug;
1313
use std::num::NonZeroI64;
1414
use std::sync::Arc;
15-
use std::time::Duration;
15+
use std::time::{Duration, Instant};
1616

1717
use async_trait::async_trait;
1818
use itertools::Itertools;
@@ -211,6 +211,9 @@ pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
211211
/// NB: We may remove this in later iterations of Pv2.
212212
fn epoch(&self) -> Epoch;
213213

214+
/// Returns the metrics for this catalog state.
215+
fn metrics(&self) -> &Metrics;
216+
214217
/// Politely releases all external resources that can only be released in an async context.
215218
async fn expire(self: Box<Self>);
216219

@@ -323,12 +326,16 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
323326
amount: u64,
324327
commit_ts: Timestamp,
325328
) -> Result<Vec<u64>, CatalogError> {
329+
let start = Instant::now();
326330
if amount == 0 {
327331
return Ok(Vec::new());
328332
}
329333
let mut txn = self.transaction().await?;
330334
let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
331335
txn.commit_internal(commit_ts).await?;
336+
self.metrics()
337+
.allocate_id_seconds
338+
.observe(start.elapsed().as_secs_f64());
332339
Ok(ids)
333340
}
334341

src/catalog/src/durable/metrics.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
1212
use mz_ore::metric;
1313
use mz_ore::metrics::{IntCounter, MetricsRegistry};
14-
use prometheus::{Counter, IntGaugeVec};
14+
use mz_ore::stats::histogram_seconds_buckets;
15+
use prometheus::{Counter, Histogram, IntGaugeVec};
1516

1617
#[derive(Debug, Clone)]
1718
pub struct Metrics {
@@ -23,6 +24,7 @@ pub struct Metrics {
2324
pub syncs: IntCounter,
2425
pub sync_latency_seconds: Counter,
2526
pub collection_entries: IntGaugeVec,
27+
pub allocate_id_seconds: Histogram,
2628
}
2729

2830
impl Metrics {
@@ -62,6 +64,11 @@ impl Metrics {
6264
help: "Total number of entries, after consolidation, per catalog collection.",
6365
var_labels: ["collection"],
6466
)),
67+
allocate_id_seconds: registry.register(metric!(
68+
name: "mz_catalog_allocate_id_seconds",
69+
help: "The time it takes to allocate IDs in the durable catalog.",
70+
buckets: histogram_seconds_buckets(0.001, 32.0),
71+
)),
6572
}
6673
}
6774
}

src/catalog/src/durable/persist.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1599,6 +1599,10 @@ impl ReadOnlyDurableCatalogState for PersistCatalogState {
15991599
.epoch
16001600
}
16011601

1602+
fn metrics(&self) -> &Metrics {
1603+
&self.metrics
1604+
}
1605+
16021606
#[mz_ore::instrument(level = "debug")]
16031607
async fn expire(self: Box<Self>) {
16041608
self.expire().await

0 commit comments

Comments
 (0)