Skip to content

Commit 1db0a34

Browse files
authored
Iceberg: introduce metrics (#34605)
These are by no means exhaustive but should give a decent starting place. I want to find a good way to measure latency to s3 and the catalog as well in a follow up. <!-- Describe the contents of the PR briefly but completely. If you write detailed commit messages, it is acceptable to copy/paste them here, or write "see commit messages for details." If there is only one commit in the PR, GitHub will have already added its commit message above. --> ### Motivation <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent 16c444b commit 1db0a34

File tree

4 files changed

+186
-1
lines changed

4 files changed

+186
-1
lines changed

src/storage/src/metrics.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,13 @@ impl StorageMetrics {
194194
) -> sink::kafka::KafkaSinkMetrics {
195195
sink::kafka::KafkaSinkMetrics::new(&self.sink_defs.kafka_defs, sink_id)
196196
}
197+
198+
/// Get an `IcebergSinkMetrics` for the given configuration.
199+
pub(crate) fn get_iceberg_sink_metrics(
200+
&self,
201+
sink_id: GlobalId,
202+
worker_id: usize,
203+
) -> sink::iceberg::IcebergSinkMetrics {
204+
sink::iceberg::IcebergSinkMetrics::new(&self.sink_defs.iceberg_defs, sink_id, worker_id)
205+
}
197206
}

src/storage/src/metrics/sink.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,22 @@
1111
1212
use mz_ore::metrics::MetricsRegistry;
1313

14+
pub mod iceberg;
1415
pub mod kafka;
1516

1617
/// A set of base metrics that hang off a central metrics registry, labeled by the sink they
1718
/// belong to.
1819
#[derive(Debug, Clone)]
1920
pub(crate) struct SinkMetricDefs {
2021
pub(crate) kafka_defs: kafka::KafkaSinkMetricDefs,
22+
pub(crate) iceberg_defs: iceberg::IcebergSinkMetricDefs,
2123
}
2224

2325
impl SinkMetricDefs {
2426
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
2527
Self {
2628
kafka_defs: kafka::KafkaSinkMetricDefs::register_with(registry),
29+
iceberg_defs: iceberg::IcebergSinkMetricDefs::register_with(registry),
2730
}
2831
}
2932
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
//! Metrics for iceberg sinks.
11+
12+
use mz_ore::{
13+
metric,
14+
metrics::{DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, UIntGaugeVec},
15+
};
16+
use mz_repr::GlobalId;
17+
use prometheus::core::AtomicU64;
18+
19+
#[derive(Debug, Clone)]
20+
pub(crate) struct IcebergSinkMetricDefs {
21+
/// Number of rows written by the iceberg sink.
22+
pub rows_written: IntCounterVec,
23+
/// Number of rows deleted by the iceberg sink.
24+
pub rows_deleted: IntCounterVec,
25+
/// Number of data files written by the iceberg sink.
26+
pub data_files_written: IntCounterVec,
27+
/// Number of delete files written by the iceberg sink.
28+
pub delete_files_written: IntCounterVec,
29+
/// Number of stashed rows in the iceberg sink.
30+
pub stashed_rows: UIntGaugeVec,
31+
/// Total number of bytes written in data and delete files to object storage.
32+
pub bytes_written: IntCounterVec,
33+
/// Number of snapshots committed by the iceberg sink.
34+
pub snapshots_committed: IntCounterVec,
35+
/// Commit failures in the iceberg sink.
36+
pub commit_failures: IntCounterVec,
37+
/// Commit conflicts in the iceberg sink.
38+
pub commit_conflicts: IntCounterVec,
39+
}
40+
41+
impl IcebergSinkMetricDefs {
42+
// Every metric must have a worker specific id associated with it. These are later wrapped
43+
// in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
44+
// would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
45+
// workers may still be running, but the metrics registry will no longer record or report
46+
// metrics for that `source_id`.
47+
pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
48+
Self {
49+
rows_written: registry.register(metric!(
50+
name: "sink_iceberg_rows_written",
51+
help: "Number of rows written by the iceberg sink",
52+
var_labels: ["sink_id", "worker_id"]
53+
)),
54+
rows_deleted: registry.register(metric!(
55+
name: "sink_iceberg_rows_deleted",
56+
help: "Number of rows deleted by the iceberg sink",
57+
var_labels: ["sink_id", "worker_id"]
58+
)),
59+
data_files_written: registry.register(metric!(
60+
name: "sink_iceberg_data_files_written",
61+
help: "Number of data files written by the iceberg sink",
62+
var_labels: ["sink_id", "worker_id"]
63+
)),
64+
delete_files_written: registry.register(metric!(
65+
name: "sink_iceberg_delete_files_written",
66+
help: "Number of delete files written by the iceberg sink",
67+
var_labels: ["sink_id", "worker_id"]
68+
)),
69+
stashed_rows: registry.register(metric!(
70+
name: "sink_iceberg_stashed_rows",
71+
help: "Number of stashed rows in the iceberg sink",
72+
var_labels: ["sink_id", "worker_id"]
73+
)),
74+
bytes_written: registry.register(metric!(
75+
name: "sink_iceberg_bytes_written",
76+
help: "Number of bytes written by the iceberg sink",
77+
var_labels: ["sink_id", "worker_id"]
78+
)),
79+
snapshots_committed: registry.register(metric!(
80+
name: "sink_iceberg_snapshots_committed",
81+
help: "Number of snapshots committed by the iceberg sink",
82+
var_labels: ["sink_id", "worker_id"]
83+
)),
84+
commit_failures: registry.register(metric!(
85+
name: "sink_iceberg_commit_failures",
86+
help: "Number of commit failures in the iceberg sink",
87+
var_labels: ["sink_id", "worker_id"]
88+
)),
89+
commit_conflicts: registry.register(metric!(
90+
name: "sink_iceberg_commit_conflicts",
91+
help: "Number of commit conflicts in the iceberg sink",
92+
var_labels: ["sink_id", "worker_id"]
93+
)),
94+
}
95+
}
96+
}
97+
98+
#[derive(Clone)]
99+
pub(crate) struct IcebergSinkMetrics {
100+
/// Number of rows written by the iceberg sink.
101+
pub rows_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
102+
/// Number of rows deleted by the iceberg sink.
103+
pub rows_deleted: DeleteOnDropCounter<AtomicU64, Vec<String>>,
104+
/// Number of data files written by the iceberg sink.
105+
pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
106+
/// Number of delete files written by the iceberg sink.
107+
pub delete_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
108+
/// Number of stashed rows in the iceberg sink.
109+
pub stashed_rows: DeleteOnDropGauge<AtomicU64, Vec<String>>,
110+
/// Number of bytes written by the iceberg sink.
111+
pub bytes_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112+
/// Number of snapshots committed by the iceberg sink.
113+
pub snapshots_committed: DeleteOnDropCounter<AtomicU64, Vec<String>>,
114+
/// Number of commit failures in the iceberg sink.
115+
pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
116+
/// Number of commit conflicts in the iceberg sink.
117+
pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
118+
}
119+
120+
impl IcebergSinkMetrics {
121+
/// Create a `IcebergSinkMetrics` from the `IcebergSinkMetricDefs`.
122+
pub(crate) fn new(defs: &IcebergSinkMetricDefs, sink_id: GlobalId, worker_id: usize) -> Self {
123+
let labels = vec![sink_id.to_string(), worker_id.to_string()];
124+
Self {
125+
rows_written: defs.rows_written.get_delete_on_drop_metric(labels.clone()),
126+
rows_deleted: defs.rows_deleted.get_delete_on_drop_metric(labels.clone()),
127+
data_files_written: defs
128+
.data_files_written
129+
.get_delete_on_drop_metric(labels.clone()),
130+
delete_files_written: defs
131+
.delete_files_written
132+
.get_delete_on_drop_metric(labels.clone()),
133+
stashed_rows: defs.stashed_rows.get_delete_on_drop_metric(labels.clone()),
134+
bytes_written: defs.bytes_written.get_delete_on_drop_metric(labels.clone()),
135+
snapshots_committed: defs
136+
.snapshots_committed
137+
.get_delete_on_drop_metric(labels.clone()),
138+
commit_failures: defs
139+
.commit_failures
140+
.get_delete_on_drop_metric(labels.clone()),
141+
commit_conflicts: defs.commit_conflicts.get_delete_on_drop_metric(labels),
142+
}
143+
}
144+
}

src/storage/src/sink/iceberg.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ use timely::dataflow::{Scope, Stream};
141141
use timely::progress::{Antichain, Timestamp as _};
142142

143143
use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144+
use crate::metrics::sink::iceberg::IcebergSinkMetrics;
144145
use crate::render::sinks::SinkRender;
145146
use crate::storage_state::StorageState;
146147

@@ -757,6 +758,7 @@ fn write_data_files<G>(
757758
connection: IcebergSinkConnection,
758759
storage_configuration: StorageConfiguration,
759760
materialize_arrow_schema: Arc<ArrowSchema>,
761+
metrics: IcebergSinkMetrics,
760762
) -> (
761763
Stream<G, BoundedDataFile>,
762764
Stream<G, HealthStatusMessage>,
@@ -948,6 +950,7 @@ where
948950
{
949951
if let Some(rows) = stashed_rows.remove(&row_ts) {
950952
for (_row, diff_pair) in rows {
953+
metrics.stashed_rows.dec();
951954
let record_batch = row_to_recordbatch(
952955
diff_pair.clone(),
953956
Arc::clone(&arrow_schema),
@@ -1008,6 +1011,7 @@ where
10081011
if !written {
10091012
// ...otherwise stash it for later
10101013
let entry = stashed_rows.entry(row_ts).or_default();
1014+
metrics.stashed_rows.inc();
10111015
entry.push((row, diff_pair));
10121016
}
10131017
}
@@ -1040,6 +1044,18 @@ where
10401044
.await
10411045
.context("Failed to close DeltaWriter")?;
10421046
for data_file in data_files {
1047+
match data_file.content_type() {
1048+
iceberg::spec::DataContentType::Data => {
1049+
metrics.data_files_written.inc();
1050+
metrics.rows_written.inc_by(data_file.record_count());
1051+
}
1052+
iceberg::spec::DataContentType::PositionDeletes
1053+
| iceberg::spec::DataContentType::EqualityDeletes => {
1054+
metrics.delete_files_written.inc();
1055+
metrics.rows_deleted.inc_by(data_file.record_count());
1056+
}
1057+
}
1058+
metrics.bytes_written.inc_by(data_file.file_size_in_bytes());
10431059
let file = BoundedDataFile::new(
10441060
data_file,
10451061
current_schema.as_ref().clone(),
@@ -1084,6 +1100,7 @@ fn commit_to_iceberg<G>(
10841100
write_handle: impl Future<
10851101
Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
10861102
> + 'static,
1103+
metrics: IcebergSinkMetrics,
10871104
) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
10881105
where
10891106
G: Scope<Timestamp = Timestamp>,
@@ -1233,6 +1250,7 @@ where
12331250
let new_table = tx.clone().commit(catalog.as_ref()).await;
12341251
match new_table {
12351252
Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1253+
metrics.commit_conflicts.inc();
12361254
let table = reload_table(
12371255
catalog.as_ref(),
12381256
connection.namespace.clone(),
@@ -1265,11 +1283,16 @@ where
12651283

12661284
RetryResult::Ok(table)
12671285
}
1268-
Err(e) => RetryResult::RetryableErr(anyhow!(e)),
1286+
Err(e) => {
1287+
metrics.commit_failures.inc();
1288+
RetryResult::RetryableErr(anyhow!(e))
1289+
},
12691290
Ok(table) => RetryResult::Ok(table)
12701291
}
12711292
}).await.context("failed to commit to iceberg")?;
12721293

1294+
metrics.snapshots_committed.inc();
1295+
12731296
let mut expect_upper = write_handle.shared_upper();
12741297
loop {
12751298
if PartialOrder::less_equal(&frontier, &expect_upper) {
@@ -1368,6 +1391,10 @@ impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
13681391
}
13691392
};
13701393

1394+
let metrics = storage_state
1395+
.metrics
1396+
.get_iceberg_sink_metrics(sink_id, scope.index());
1397+
13711398
let connection_for_minter = self.clone();
13721399
let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
13731400
format!("{sink_id}-iceberg-mint"),
@@ -1387,6 +1414,7 @@ impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
13871414
connection_for_writer,
13881415
storage_state.storage_configuration.clone(),
13891416
Arc::new(arrow_schema_with_ids.clone()),
1417+
metrics.clone(),
13901418
);
13911419

13921420
let connection_for_committer = self.clone();
@@ -1400,6 +1428,7 @@ impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
14001428
connection_for_committer,
14011429
storage_state.storage_configuration.clone(),
14021430
write_handle,
1431+
metrics.clone(),
14031432
);
14041433

14051434
let running_status = Some(HealthStatusMessage {

0 commit comments

Comments
 (0)