Skip to content

Commit 1c250df

Browse files
authored
Merge pull request #34560 from bkirwi/collections-cleanup
[storage] Collections cleanup
2 parents 76df7b3 + 5dee7c8 commit 1c250df

File tree

8 files changed

+108
-415
lines changed

8 files changed

+108
-415
lines changed

src/adapter/src/coord.rs

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2744,26 +2744,17 @@ impl Coordinator {
27442744
migrated_storage_collections: &BTreeSet<CatalogItemId>,
27452745
) {
27462746
let catalog = self.catalog();
2747-
let source_status_collection_id = catalog
2748-
.resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_SOURCE_STATUS_HISTORY);
2749-
let source_status_collection_id = catalog
2750-
.get_entry(&source_status_collection_id)
2751-
.latest_global_id();
27522747

27532748
let source_desc = |object_id: GlobalId,
27542749
data_source: &DataSourceDesc,
27552750
desc: &RelationDesc,
27562751
timeline: &Timeline| {
2757-
let (data_source, status_collection_id) = match data_source.clone() {
2752+
let data_source = match data_source.clone() {
27582753
// Re-announce the source description.
27592754
DataSourceDesc::Ingestion { desc, cluster_id } => {
27602755
let desc = desc.into_inline_connection(catalog.state());
27612756
let ingestion = IngestionDescription::new(desc, cluster_id, object_id);
2762-
2763-
(
2764-
DataSource::Ingestion(ingestion),
2765-
Some(source_status_collection_id),
2766-
)
2757+
DataSource::Ingestion(ingestion)
27672758
}
27682759
DataSourceDesc::OldSyntaxIngestion {
27692760
desc,
@@ -2787,10 +2778,7 @@ impl Coordinator {
27872778
};
27882779
ingestion.source_exports.insert(object_id, legacy_export);
27892780

2790-
(
2791-
DataSource::Ingestion(ingestion),
2792-
Some(source_status_collection_id),
2793-
)
2781+
DataSource::Ingestion(ingestion)
27942782
}
27952783
DataSourceDesc::IngestionExport {
27962784
ingestion_id,
@@ -2801,28 +2789,23 @@ impl Coordinator {
28012789
// TODO(parkmycar): We should probably check the type here, but I'm not sure if
28022790
// this will always be a Source or a Table.
28032791
let ingestion_id = catalog.get_entry(&ingestion_id).latest_global_id();
2804-
(
2805-
DataSource::IngestionExport {
2806-
ingestion_id,
2807-
details,
2808-
data_config: data_config.into_inline_connection(catalog.state()),
2809-
},
2810-
Some(source_status_collection_id),
2811-
)
2812-
}
2813-
DataSourceDesc::Webhook { .. } => {
2814-
(DataSource::Webhook, Some(source_status_collection_id))
2792+
2793+
DataSource::IngestionExport {
2794+
ingestion_id,
2795+
details,
2796+
data_config: data_config.into_inline_connection(catalog.state()),
2797+
}
28152798
}
2816-
DataSourceDesc::Progress => (DataSource::Progress, None),
2799+
DataSourceDesc::Webhook { .. } => DataSource::Webhook,
2800+
DataSourceDesc::Progress => DataSource::Progress,
28172801
DataSourceDesc::Introspection(introspection) => {
2818-
(DataSource::Introspection(introspection), None)
2802+
DataSource::Introspection(introspection)
28192803
}
28202804
};
28212805
CollectionDescription {
28222806
desc: desc.clone(),
28232807
data_source,
28242808
since: None,
2825-
status_collection_id,
28262809
timeline: Some(timeline.clone()),
28272810
primary: None,
28282811
}
@@ -2938,7 +2921,6 @@ impl Coordinator {
29382921
},
29392922
},
29402923
since: None,
2941-
status_collection_id: None,
29422924
timeline: None,
29432925
primary: None,
29442926
};

src/adapter/src/coord/catalog_implications.rs

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use std::time::{Duration, Instant};
3333
use fail::fail_point;
3434
use itertools::Itertools;
3535
use mz_adapter_types::compaction::CompactionWindow;
36-
use mz_catalog::builtin;
3736
use mz_catalog::memory::objects::{
3837
CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
3938
MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
@@ -1021,19 +1020,8 @@ impl Coordinator {
10211020
details,
10221021
data_config,
10231022
} => {
1024-
// TODO: It's a little weird that a table will be present in this
1025-
// source status collection, we might want to split out into a separate
1026-
// status collection.
1027-
let status_collection_id = self
1028-
.catalog()
1029-
.resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);
1030-
10311023
let global_ingestion_id =
10321024
self.catalog().get_entry(ingestion_id).latest_global_id();
1033-
let global_status_collection_id = self
1034-
.catalog()
1035-
.get_entry(&status_collection_id)
1036-
.latest_global_id();
10371025

10381026
let collection_desc = CollectionDescription::<Timestamp> {
10391027
desc: table.desc.latest(),
@@ -1045,7 +1033,6 @@ impl Coordinator {
10451033
.into_inline_connection(self.catalog.state()),
10461034
},
10471035
since: None,
1048-
status_collection_id: Some(global_status_collection_id),
10491036
timeline: Some(timeline.clone()),
10501037
primary: None,
10511038
};
@@ -1091,7 +1078,6 @@ impl Coordinator {
10911078
desc,
10921079
data_source: DataSource::Webhook,
10931080
since: None,
1094-
status_collection_id: None, // Webhook tables don't use status collections
10951081
timeline: Some(timeline.clone()),
10961082
primary: None,
10971083
};
@@ -1233,16 +1219,7 @@ impl Coordinator {
12331219
source: Source,
12341220
compaction_windows: BTreeMap<CompactionWindow, BTreeSet<CatalogItemId>>,
12351221
) -> Result<(), AdapterError> {
1236-
let source_status_item_id = self
1237-
.catalog()
1238-
.resolve_builtin_storage_collection(&builtin::MZ_SOURCE_STATUS_HISTORY);
1239-
let source_status_collection_id = Some(
1240-
self.catalog()
1241-
.get_entry(&source_status_item_id)
1242-
.latest_global_id(),
1243-
);
1244-
1245-
let (data_source, status_collection_id) = match source.data_source {
1222+
let data_source = match source.data_source {
12461223
DataSourceDesc::Ingestion { desc, cluster_id } => {
12471224
let desc = desc.into_inline_connection(self.catalog().state());
12481225
let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();
@@ -1253,10 +1230,7 @@ impl Coordinator {
12531230
item_global_id,
12541231
);
12551232

1256-
(
1257-
DataSource::Ingestion(ingestion),
1258-
source_status_collection_id,
1259-
)
1233+
DataSource::Ingestion(ingestion)
12601234
}
12611235
DataSourceDesc::OldSyntaxIngestion {
12621236
desc,
@@ -1291,10 +1265,7 @@ impl Coordinator {
12911265
.source_exports
12921266
.insert(source.global_id, legacy_export);
12931267

1294-
(
1295-
DataSource::Ingestion(ingestion),
1296-
source_status_collection_id,
1297-
)
1268+
DataSource::Ingestion(ingestion)
12981269
}
12991270
DataSourceDesc::IngestionExport {
13001271
ingestion_id,
@@ -1305,17 +1276,15 @@ impl Coordinator {
13051276
// TODO(parkmycar): We should probably check the type here, but I'm not sure if
13061277
// this will always be a Source or a Table.
13071278
let ingestion_id = self.catalog().get_entry(&ingestion_id).latest_global_id();
1308-
(
1309-
DataSource::IngestionExport {
1310-
ingestion_id,
1311-
details,
1312-
data_config: data_config.into_inline_connection(self.catalog().state()),
1313-
},
1314-
source_status_collection_id,
1315-
)
1279+
1280+
DataSource::IngestionExport {
1281+
ingestion_id,
1282+
details,
1283+
data_config: data_config.into_inline_connection(self.catalog().state()),
1284+
}
13161285
}
1317-
DataSourceDesc::Progress => (DataSource::Progress, None),
1318-
DataSourceDesc::Webhook { .. } => (DataSource::Webhook, None),
1286+
DataSourceDesc::Progress => DataSource::Progress,
1287+
DataSourceDesc::Webhook { .. } => DataSource::Webhook,
13191288
DataSourceDesc::Introspection(_) => {
13201289
unreachable!("cannot create sources with introspection data sources")
13211290
}
@@ -1328,7 +1297,6 @@ impl Coordinator {
13281297
data_source,
13291298
timeline: Some(source.timeline),
13301299
since: None,
1331-
status_collection_id,
13321300
primary: None,
13331301
},
13341302
);

src/adapter/src/coord/ddl.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,6 @@ impl Coordinator {
10001000
},
10011001
},
10021002
since: None,
1003-
status_collection_id: None,
10041003
timeline: None,
10051004
primary: None,
10061005
};

src/compute-client/src/as_of_selection.rs

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -867,12 +867,10 @@ mod tests {
867867
use mz_storage_client::controller::{CollectionDescription, StorageMetadata, StorageTxn};
868868
use mz_storage_client::storage_collections::{CollectionFrontiers, SnapshotCursor};
869869
use mz_storage_types::StorageDiff;
870-
use mz_storage_types::connections::inline::InlinedConnection;
871870
use mz_storage_types::controller::{CollectionMetadata, StorageError};
872871
use mz_storage_types::errors::CollectionMissing;
873872
use mz_storage_types::parameters::StorageParameters;
874-
use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
875-
use mz_storage_types::sources::{SourceData, SourceExportDataConfig};
873+
use mz_storage_types::sources::SourceData;
876874
use mz_storage_types::time_dependence::{TimeDependence, TimeDependenceError};
877875
use timely::progress::Timestamp as TimelyTimestamp;
878876

@@ -1041,28 +1039,6 @@ mod tests {
10411039
unimplemented!()
10421040
}
10431041

1044-
async fn alter_ingestion_source_desc(
1045-
&self,
1046-
_ingestion_id: GlobalId,
1047-
_source_desc: SourceDesc,
1048-
) -> Result<(), StorageError<Self::Timestamp>> {
1049-
unimplemented!()
1050-
}
1051-
1052-
async fn alter_ingestion_export_data_configs(
1053-
&self,
1054-
_source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
1055-
) -> Result<(), StorageError<Self::Timestamp>> {
1056-
unimplemented!()
1057-
}
1058-
1059-
async fn alter_ingestion_connections(
1060-
&self,
1061-
_source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
1062-
) -> Result<(), StorageError<Self::Timestamp>> {
1063-
unimplemented!()
1064-
}
1065-
10661042
async fn alter_table_desc(
10671043
&self,
10681044
_existing_collection: GlobalId,

src/persist-client/src/usage.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,10 @@ impl StorageUsageClient {
250250
}
251251

252252
/// Computes [ShardUsageReferenced] for a given set of shards. Suitable for customer billing.
253-
pub async fn shards_usage_referenced<I>(&self, shard_ids: I) -> ShardsUsageReferenced
254-
where
255-
I: IntoIterator<Item = ShardId>,
256-
{
253+
pub async fn shards_usage_referenced(
254+
&self,
255+
shard_ids: impl IntoIterator<Item = ShardId>,
256+
) -> ShardsUsageReferenced {
257257
let semaphore = Arc::new(Semaphore::new(
258258
USAGE_STATE_FETCH_CONCURRENCY_LIMIT.get(&self.cfg),
259259
));

src/storage-client/src/controller.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,6 @@ pub struct CollectionDescription<T> {
142142
pub data_source: DataSource<T>,
143143
/// An optional frontier to which the collection's `since` should be advanced.
144144
pub since: Option<Antichain<T>>,
145-
/// A GlobalId to use for this collection to use for the status collection.
146-
/// Used to keep track of source status/error information.
147-
pub status_collection_id: Option<GlobalId>,
148145
/// The timeline of the source. Absent for materialized views, continual tasks, etc.
149146
pub timeline: Option<Timeline>,
150147
/// The primary of this collections.
@@ -163,7 +160,6 @@ impl<T> CollectionDescription<T> {
163160
desc,
164161
data_source: DataSource::Other,
165162
since,
166-
status_collection_id: None,
167163
timeline: None,
168164
primary: None,
169165
}
@@ -175,7 +171,6 @@ impl<T> CollectionDescription<T> {
175171
desc,
176172
data_source: DataSource::Table,
177173
since: None,
178-
status_collection_id: None,
179174
timeline: Some(Timeline::EpochMilliseconds),
180175
primary: None,
181176
}

0 commit comments

Comments
 (0)