Skip to content

Commit e68d38e

Browse files
committed
adapter: enable dropping of replaced materialized views
Once materialized views can have multiple historical storage collections associated with them, we'll need to make sure to drop all of those when dropping a materialized view. There is always only a single compute collection to drop. This commit ensures we'll do the right thing and also simplifies the item dropping code a bit in the process: Instead of keeping track of indexes/MVs/CTs to drop, we instead only track compute and storage collections to drop. Doing so allows deleting a bunch of duplicate code.
1 parent 417c7f7 commit e68d38e

File tree

3 files changed

+15
-90
lines changed

3 files changed

+15
-90
lines changed

src/adapter/src/coord/catalog_implications.rs

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,7 @@ impl Coordinator {
159159
let mut sources_to_drop = vec![];
160160
let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
161161
let mut storage_sink_gids_to_drop = vec![];
162-
let mut indexes_to_drop = vec![];
163-
let mut materialized_views_to_drop = vec![];
164-
let mut continual_tasks_to_drop = vec![];
162+
let mut compute_gids_to_drop = vec![];
165163
let mut view_gids_to_drop = vec![];
166164
let mut secrets_to_drop = vec![];
167165
let mut vpc_endpoints_to_drop = vec![];
@@ -305,7 +303,7 @@ impl Coordinator {
305303
);
306304
}
307305
CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
308-
indexes_to_drop.push((index.cluster_id, index.global_id()));
306+
compute_gids_to_drop.push((index.cluster_id, index.global_id()));
309307
dropped_item_names.insert(index.global_id(), full_name);
310308
}
311309
CatalogImplication::MaterializedView(CatalogImplicationKind::Added(mv)) => {
@@ -325,7 +323,8 @@ impl Coordinator {
325323
mv,
326324
full_name,
327325
)) => {
328-
materialized_views_to_drop.push((mv.cluster_id, mv.global_id_writes()));
326+
compute_gids_to_drop.push((mv.cluster_id, mv.global_id_writes()));
327+
sources_to_drop.extend(mv.global_ids().map(|gid| (catalog_id, gid)));
329328
dropped_item_names.insert(mv.global_id_writes(), full_name);
330329
}
331330
CatalogImplication::View(CatalogImplicationKind::Added(view)) => {
@@ -358,7 +357,8 @@ impl Coordinator {
358357
ct,
359358
_full_name,
360359
)) => {
361-
continual_tasks_to_drop.push((catalog_id, ct.cluster_id, ct.global_id()));
360+
compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
361+
sources_to_drop.push((catalog_id, ct.global_id()));
362362
}
363363
CatalogImplication::Secret(CatalogImplicationKind::Added(secret)) => {
364364
tracing::debug!(?secret, "not handling AddSecret in here yet");
@@ -524,9 +524,7 @@ impl Coordinator {
524524
.map(|(_, gid)| *gid)
525525
.chain(tables_to_drop.iter().map(|(_, gid)| *gid))
526526
.chain(storage_sink_gids_to_drop.iter().copied())
527-
.chain(indexes_to_drop.iter().map(|(_, id)| *id))
528-
.chain(materialized_views_to_drop.iter().map(|(_, id)| *id))
529-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
527+
.chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
530528
.chain(view_gids_to_drop.iter().copied())
531529
.collect();
532530

@@ -592,27 +590,14 @@ impl Coordinator {
592590
}
593591
}
594592

595-
let storage_ids_to_drop: BTreeSet<_> = sources_to_drop
593+
let storage_gids_to_drop: BTreeSet<_> = sources_to_drop
596594
.iter()
597595
.map(|(_id, gid)| gid)
598596
.chain(storage_sink_gids_to_drop.iter())
599597
.chain(tables_to_drop.iter().map(|(_id, gid)| gid))
600-
.chain(materialized_views_to_drop.iter().map(|(_, id)| id))
601-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| gid))
602598
.copied()
603599
.collect();
604600

605-
let compute_ids_to_drop: BTreeSet<_> = indexes_to_drop
606-
.iter()
607-
.copied()
608-
.chain(materialized_views_to_drop.iter().copied())
609-
.chain(
610-
continual_tasks_to_drop
611-
.iter()
612-
.map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
613-
)
614-
.collect();
615-
616601
// Gather resources that we have to remove from timeline state and
617602
// pre-check if any Timelines become empty, when we drop the specified
618603
// storage and compute resources.
@@ -624,13 +609,13 @@ impl Coordinator {
624609
let mut id_bundle = CollectionIdBundle::default();
625610

626611
for storage_id in read_holds.storage_ids() {
627-
if storage_ids_to_drop.contains(&storage_id) {
612+
if storage_gids_to_drop.contains(&storage_id) {
628613
id_bundle.storage_ids.insert(storage_id);
629614
}
630615
}
631616

632617
for (instance_id, id) in read_holds.compute_ids() {
633-
if compute_ids_to_drop.contains(&(instance_id, id))
618+
if compute_gids_to_drop.contains(&(instance_id, id))
634619
|| clusters_to_drop.contains(&instance_id)
635620
{
636621
id_bundle
@@ -713,16 +698,8 @@ impl Coordinator {
713698
}
714699
}
715700

716-
if !indexes_to_drop.is_empty() {
717-
self.drop_indexes(indexes_to_drop);
718-
}
719-
720-
if !materialized_views_to_drop.is_empty() {
721-
self.drop_materialized_views(materialized_views_to_drop);
722-
}
723-
724-
if !continual_tasks_to_drop.is_empty() {
725-
self.drop_continual_tasks(continual_tasks_to_drop);
701+
if !compute_gids_to_drop.is_empty() {
702+
self.drop_compute_collections(compute_gids_to_drop);
726703
}
727704

728705
if !vpc_endpoints_to_drop.is_empty() {

src/adapter/src/coord/ddl.rs

Lines changed: 2 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -739,9 +739,9 @@ impl Coordinator {
739739
.unwrap_or_terminate("cannot fail to drop sinks");
740740
}
741741

742-
pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
742+
pub(crate) fn drop_compute_collections(&mut self, collections: Vec<(ClusterId, GlobalId)>) {
743743
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
744-
for (cluster_id, gid) in indexes {
744+
for (cluster_id, gid) in collections {
745745
by_cluster.entry(cluster_id).or_default().push(gid);
746746
}
747747
for (cluster_id, gids) in by_cluster {
@@ -755,58 +755,6 @@ impl Coordinator {
755755
}
756756
}
757757

758-
/// A convenience method for dropping materialized views.
759-
pub(crate) fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
760-
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
761-
let mut mv_gids = Vec::new();
762-
for (cluster_id, gid) in mviews {
763-
by_cluster.entry(cluster_id).or_default().push(gid);
764-
mv_gids.push(gid);
765-
}
766-
767-
// Drop compute sinks.
768-
for (cluster_id, ids) in by_cluster {
769-
let compute = &mut self.controller.compute;
770-
// A cluster could have been dropped, so verify it exists.
771-
if compute.instance_exists(cluster_id) {
772-
compute
773-
.drop_collections(cluster_id, ids)
774-
.unwrap_or_terminate("cannot fail to drop collections");
775-
}
776-
}
777-
778-
// Drop storage resources.
779-
let storage_metadata = self.catalog.state().storage_metadata();
780-
self.controller
781-
.storage
782-
.drop_sources(storage_metadata, mv_gids)
783-
.unwrap_or_terminate("cannot fail to drop sources");
784-
}
785-
786-
/// A convenience method for dropping continual tasks.
787-
pub(crate) fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
788-
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
789-
let mut source_ids = Vec::new();
790-
for (item_id, cluster_id, gid) in cts {
791-
by_cluster.entry(cluster_id).or_default().push(gid);
792-
source_ids.push((item_id, gid));
793-
}
794-
795-
// Drop compute sinks.
796-
for (cluster_id, ids) in by_cluster {
797-
let compute = &mut self.controller.compute;
798-
// A cluster could have been dropped, so verify it exists.
799-
if compute.instance_exists(cluster_id) {
800-
compute
801-
.drop_collections(cluster_id, ids)
802-
.unwrap_or_terminate("cannot fail to drop collections");
803-
}
804-
}
805-
806-
// Drop storage sources.
807-
self.drop_sources(source_ids)
808-
}
809-
810758
pub(crate) fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
811759
let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
812760
.as_ref()

src/adapter/src/coord/peek.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ impl crate::coord::Coordinator {
861861
// If a dataflow was created, drop it once the peek command is sent.
862862
if let Some(index_id) = drop_dataflow {
863863
self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
864-
self.drop_indexes(vec![(compute_instance, index_id)]);
864+
self.drop_compute_collections(vec![(compute_instance, index_id)]);
865865
}
866866

867867
let persist_client = self.persist_client.clone();

0 commit comments

Comments
 (0)