Skip to content

Commit 282ed16

Browse files
authored
Merge pull request #30713 from petrosagg/delete-legacy-reclocking
storage: delete old reclocking implementation
2 parents 33efcde + 2ba5483 commit 282ed16

File tree

9 files changed

+208
-1898
lines changed

9 files changed

+208
-1898
lines changed

misc/python/materialize/mzcompose/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ def get_default_system_parameters(
161161
"storage_statistics_collection_interval": "1000",
162162
"storage_statistics_interval": "2000",
163163
"storage_use_continual_feedback_upsert": "true",
164-
"storage_use_reclock_v2": "true",
165164
"timestamp_oracle": "postgres",
166165
"wait_catalog_consolidation_on_startup": "true",
167166
"with_0dt_deployment_max_wait": "900s",

src/storage-types/src/dyncfgs.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,6 @@ pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
205205
"Delay interval when reconnecting to a source / sink after halt.",
206206
);
207207

208-
/// Whether to use the new reclock implementation.
209-
pub const STORAGE_USE_RECLOCK_V2: Config<bool> = Config::new(
210-
"storage_use_reclock_v2",
211-
true,
212-
"Whether to use the new reclock implementation.",
213-
);
214-
215208
/// Whether to mint reclock bindings based on the latest probed frontier or the currently ingested
216209
/// frontier.
217210
pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
@@ -249,7 +242,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
249242
.add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
250243
.add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
251244
.add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
252-
.add(&STORAGE_USE_RECLOCK_V2)
253245
.add(&STORAGE_RECLOCK_TO_LATEST)
254246
.add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
255247
}

src/storage/src/metrics.rs

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,12 @@
3030
3131
use std::sync::Arc;
3232

33-
use mz_ore::channel::{
34-
instrumented_unbounded_channel, InstrumentedUnboundedReceiver, InstrumentedUnboundedSender,
35-
};
3633
use mz_ore::metrics::MetricsRegistry;
37-
use mz_ore::metrics::{DeleteOnDropCounter, MetricVecExt};
3834
use mz_repr::GlobalId;
39-
use prometheus::core::AtomicU64;
4035

4136
use crate::statistics::{SinkStatisticsMetricDefs, SourceStatisticsMetricDefs};
4237
use mz_storage_operators::metrics::BackpressureMetrics;
4338

44-
pub mod channel;
4539
pub mod decode;
4640
pub mod sink;
4741
pub mod source;
@@ -190,39 +184,4 @@ impl StorageMetrics {
190184
) -> sink::kafka::KafkaSinkMetrics {
191185
sink::kafka::KafkaSinkMetrics::new(&self.sink_defs.kafka_defs, sink_id)
192186
}
193-
194-
/// Produce an instrumented channel for use in the source pipeline.
195-
pub(crate) fn get_instrumented_source_channel<T>(
196-
&self,
197-
id: GlobalId,
198-
worker_id: usize,
199-
worker_count: usize,
200-
location: &str,
201-
) -> (
202-
InstrumentedUnboundedSender<T, DeleteOnDropCounter<'static, AtomicU64, Vec<String>>>,
203-
InstrumentedUnboundedReceiver<T, DeleteOnDropCounter<'static, AtomicU64, Vec<String>>>,
204-
) {
205-
let sender_metric = self
206-
.source_defs
207-
.channel_metric_defs
208-
.sends
209-
.get_delete_on_drop_metric(vec![
210-
id.to_string(),
211-
worker_id.to_string(),
212-
worker_count.to_string(),
213-
location.to_string(),
214-
]);
215-
let recv_metric = self
216-
.source_defs
217-
.channel_metric_defs
218-
.recvs
219-
.get_delete_on_drop_metric(vec![
220-
id.to_string(),
221-
worker_id.to_string(),
222-
worker_count.to_string(),
223-
location.to_string(),
224-
]);
225-
226-
instrumented_unbounded_channel(sender_metric, recv_metric)
227-
}
228187
}

src/storage/src/metrics/channel.rs

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/storage/src/metrics/source.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ pub(crate) struct GeneralSourceMetricDefs {
3535
// Source metrics
3636
pub(crate) capability: UIntGaugeVec,
3737
pub(crate) resume_upper: IntGaugeVec,
38-
pub(crate) inmemory_remap_bindings: UIntGaugeVec,
3938
pub(crate) commit_upper_ready_times: UIntGaugeVec,
4039
pub(crate) commit_upper_accepted_times: UIntGaugeVec,
4140

@@ -69,11 +68,6 @@ impl GeneralSourceMetricDefs {
6968
help: "The timestamp-domain resumption frontier chosen for a source's ingestion",
7069
var_labels: ["source_id"],
7170
)),
72-
inmemory_remap_bindings: registry.register(metric!(
73-
name: "mz_source_inmemory_remap_bindings",
74-
help: "The number of in-memory remap bindings that reclocking a time needs to iterate over.",
75-
var_labels: ["source_id", "worker_id"],
76-
)),
7771
commit_upper_ready_times: registry.register(metric!(
7872
name: "mz_source_commit_upper_ready_times",
7973
help: "The number of ready remap bindings that are held in the reclock commit upper operator.",
@@ -130,8 +124,6 @@ pub(crate) struct SourceMetrics {
130124
pub(crate) capability: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
131125
/// The resume_upper for a source.
132126
pub(crate) resume_upper: DeleteOnDropGauge<'static, AtomicI64, Vec<String>>,
133-
/// The number of in-memory remap bindings that reclocking a time needs to iterate over.
134-
pub(crate) inmemory_remap_bindings: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
135127
/// The number of ready remap bindings that are held in the reclock commit upper operator.
136128
pub(crate) commit_upper_ready_times: DeleteOnDropGauge<'static, AtomicU64, Vec<String>>,
137129
/// The number of accepted remap bindings that are held in the reclock commit upper operator.
@@ -156,9 +148,6 @@ impl SourceMetrics {
156148
resume_upper: defs
157149
.resume_upper
158150
.get_delete_on_drop_metric(vec![source_id.to_string()]),
159-
inmemory_remap_bindings: defs
160-
.inmemory_remap_bindings
161-
.get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
162151
commit_upper_ready_times: defs
163152
.commit_upper_ready_times
164153
.get_delete_on_drop_metric(vec![source_id.to_string(), worker_id.to_string()]),
@@ -260,9 +249,6 @@ pub(crate) struct SourceMetricDefs {
260249
pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
261250
/// A cluster-wide counter shared across all sources.
262251
pub(crate) bytes_read: IntCounter,
263-
264-
// Additional metrics definitions;
265-
pub(crate) channel_metric_defs: super::channel::ChannelMetricDefs,
266252
}
267253

268254
impl SourceMetricDefs {
@@ -276,7 +262,6 @@ impl SourceMetricDefs {
276262
name: "mz_bytes_read_total",
277263
help: "Count of bytes read from sources",
278264
)),
279-
channel_metric_defs: super::channel::ChannelMetricDefs::register_with(registry),
280265
}
281266
}
282267
}

0 commit comments

Comments
 (0)