Skip to content

Commit 45a9e27

Browse files
committed
adapter: enable dropping of replaced materialized views
Once materialized views can have multiple historical storage collections associated with them, we'll need to make sure to drop all of those when dropping a materialized view. There is always only a single compute collection to drop. This commit ensures we'll do the right thing and also simplifies the item dropping code a bit in the process: Instead of keeping track of indexes/MVs/CTs to drop, we instead only track compute and storage collections to drop. Doing so allows deleting a bunch of duplicate code.
1 parent 72dab4d commit 45a9e27

File tree

1 file changed

+16
-89
lines changed

1 file changed

+16
-89
lines changed

src/adapter/src/coord/ddl.rs

Lines changed: 16 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,7 @@ impl Coordinator {
214214
let mut webhook_sources_to_restart = BTreeSet::new();
215215
let mut table_gids_to_drop = vec![];
216216
let mut storage_sink_gids_to_drop = vec![];
217-
let mut indexes_to_drop = vec![];
218-
let mut materialized_views_to_drop = vec![];
219-
let mut continual_tasks_to_drop = vec![];
217+
let mut compute_gids_to_drop = vec![];
220218
let mut views_to_drop = vec![];
221219
let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
222220
let mut secrets_to_drop = vec![];
@@ -275,21 +273,21 @@ impl Coordinator {
275273
storage_sink_gids_to_drop.push(sink.global_id());
276274
}
277275
CatalogItem::Index(index) => {
278-
indexes_to_drop.push((index.cluster_id, index.global_id()));
276+
compute_gids_to_drop
277+
.push((index.cluster_id, index.global_id()));
279278
}
280279
CatalogItem::MaterializedView(mv) => {
281-
materialized_views_to_drop
280+
compute_gids_to_drop
282281
.push((mv.cluster_id, mv.global_id_writes()));
282+
sources_to_drop
283+
.extend(mv.global_ids().map(|gid| (*id, gid)));
283284
}
284285
CatalogItem::View(view) => {
285286
views_to_drop.push((*id, view.clone()))
286287
}
287288
CatalogItem::ContinualTask(ct) => {
288-
continual_tasks_to_drop.push((
289-
*id,
290-
ct.cluster_id,
291-
ct.global_id(),
292-
));
289+
compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
290+
sources_to_drop.push((*id, ct.global_id()));
293291
}
294292
CatalogItem::Secret(_) => {
295293
secrets_to_drop.push(*id);
@@ -422,9 +420,7 @@ impl Coordinator {
422420
.map(|(_, gid)| *gid)
423421
.chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
424422
.chain(storage_sink_gids_to_drop.iter().copied())
425-
.chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
426-
.chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
427-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
423+
.chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
428424
.chain(views_to_drop.iter().map(|(_id, view)| view.global_id()))
429425
.collect();
430426

@@ -494,30 +490,19 @@ impl Coordinator {
494490
}
495491
}
496492

497-
let storage_ids_to_drop = sources_to_drop
493+
let storage_gids_to_drop = sources_to_drop
498494
.iter()
499495
.map(|(_, gid)| *gid)
500496
.chain(storage_sink_gids_to_drop.iter().copied())
501-
.chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
502-
.chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
503-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid));
504-
let compute_ids_to_drop = indexes_to_drop
505-
.iter()
506-
.copied()
507-
.chain(materialized_views_to_drop.iter().copied())
508-
.chain(
509-
continual_tasks_to_drop
510-
.iter()
511-
.map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
512-
);
497+
.chain(table_gids_to_drop.iter().map(|(_, gid)| *gid));
513498

514499
// Check if any Timelines would become empty, if we dropped the specified storage or
515500
// compute resources.
516501
//
517502
// Note: only after a Transaction succeeds do we actually drop the timeline
518503
let collection_id_bundle = self.build_collection_id_bundle(
519-
storage_ids_to_drop,
520-
compute_ids_to_drop,
504+
storage_gids_to_drop,
505+
compute_gids_to_drop.clone(),
521506
clusters_to_drop.clone(),
522507
);
523508
let timeline_associations: BTreeMap<_, _> = self
@@ -655,14 +640,8 @@ impl Coordinator {
655640
self.cancel_pending_copy(&conn_id);
656641
}
657642
}
658-
if !indexes_to_drop.is_empty() {
659-
self.drop_indexes(indexes_to_drop);
660-
}
661-
if !materialized_views_to_drop.is_empty() {
662-
self.drop_materialized_views(materialized_views_to_drop);
663-
}
664-
if !continual_tasks_to_drop.is_empty() {
665-
self.drop_continual_tasks(continual_tasks_to_drop);
643+
if !compute_gids_to_drop.is_empty() {
644+
self.drop_compute_collections(compute_gids_to_drop);
666645
}
667646
if !vpc_endpoints_to_drop.is_empty() {
668647
self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
@@ -1030,7 +1009,7 @@ impl Coordinator {
10301009
.unwrap_or_terminate("cannot fail to drop sinks");
10311010
}
10321011

1033-
pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
1012+
pub(crate) fn drop_compute_collections(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
10341013
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
10351014
for (cluster_id, gid) in indexes {
10361015
by_cluster.entry(cluster_id).or_default().push(gid);
@@ -1046,58 +1025,6 @@ impl Coordinator {
10461025
}
10471026
}
10481027

1049-
/// A convenience method for dropping materialized views.
1050-
fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
1051-
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1052-
let mut mv_gids = Vec::new();
1053-
for (cluster_id, gid) in mviews {
1054-
by_cluster.entry(cluster_id).or_default().push(gid);
1055-
mv_gids.push(gid);
1056-
}
1057-
1058-
// Drop compute sinks.
1059-
for (cluster_id, ids) in by_cluster {
1060-
let compute = &mut self.controller.compute;
1061-
// A cluster could have been dropped, so verify it exists.
1062-
if compute.instance_exists(cluster_id) {
1063-
compute
1064-
.drop_collections(cluster_id, ids)
1065-
.unwrap_or_terminate("cannot fail to drop collections");
1066-
}
1067-
}
1068-
1069-
// Drop storage resources.
1070-
let storage_metadata = self.catalog.state().storage_metadata();
1071-
self.controller
1072-
.storage
1073-
.drop_sources(storage_metadata, mv_gids)
1074-
.unwrap_or_terminate("cannot fail to drop sources");
1075-
}
1076-
1077-
/// A convenience method for dropping continual tasks.
1078-
fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
1079-
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1080-
let mut source_ids = Vec::new();
1081-
for (item_id, cluster_id, gid) in cts {
1082-
by_cluster.entry(cluster_id).or_default().push(gid);
1083-
source_ids.push((item_id, gid));
1084-
}
1085-
1086-
// Drop compute sinks.
1087-
for (cluster_id, ids) in by_cluster {
1088-
let compute = &mut self.controller.compute;
1089-
// A cluster could have been dropped, so verify it exists.
1090-
if compute.instance_exists(cluster_id) {
1091-
compute
1092-
.drop_collections(cluster_id, ids)
1093-
.unwrap_or_terminate("cannot fail to drop collections");
1094-
}
1095-
}
1096-
1097-
// Drop storage sources.
1098-
self.drop_sources(source_ids)
1099-
}
1100-
11011028
fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
11021029
let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
11031030
.as_ref()

0 commit comments

Comments
 (0)