diff --git a/src/adapter/src/coord/catalog_implications.rs b/src/adapter/src/coord/catalog_implications.rs index 03448bd5adc53..faf9fe693aa38 100644 --- a/src/adapter/src/coord/catalog_implications.rs +++ b/src/adapter/src/coord/catalog_implications.rs @@ -38,6 +38,7 @@ use mz_catalog::memory::objects::{ CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index, MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View, }; +use mz_cloud_resources::VpcEndpointConfig; use mz_compute_client::protocol::response::PeekResponse; use mz_controller_types::{ClusterId, ReplicaId}; use mz_ore::collections::CollectionExt; @@ -181,6 +182,7 @@ impl Coordinator { let mut source_collections_to_create = BTreeMap::new(); let mut storage_policies_to_initialize = BTreeMap::new(); let mut execution_timestamps_to_set = BTreeSet::new(); + let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![]; // We're incrementally migrating the code that manipulates the // controller from closures in the sequencer. For some types of catalog @@ -360,18 +362,18 @@ impl Coordinator { )) => { continual_tasks_to_drop.push((catalog_id, ct.cluster_id, ct.global_id())); } - CatalogImplication::Secret(CatalogImplicationKind::Added(secret)) => { - tracing::debug!(?secret, "not handling AddSecret in here yet"); + CatalogImplication::Secret(CatalogImplicationKind::Added(_secret)) => { + // No action needed: the secret payload is stored in + // secrets_controller.ensure() BEFORE the catalog transaction. + // By the time we see this update, the secret is already stored. } CatalogImplication::Secret(CatalogImplicationKind::Altered { - prev: prev_secret, - new: new_secret, + prev: _prev_secret, + new: _new_secret, }) => { - tracing::debug!( - ?prev_secret, - ?new_secret, - "not handling AlterSecret in here yet" - ); + // No action needed: altering a secret updates the payload via + // secrets_controller.ensure() without a catalog transaction, + // so we shouldn't see AlterSecret updates here. } CatalogImplication::Secret(CatalogImplicationKind::Dropped( _secret, @@ -380,17 +382,29 @@ impl Coordinator { secrets_to_drop.push(catalog_id); } CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => { - tracing::debug!(?connection, "not handling AddConnection in here yet"); + match &connection.details { + // SSH connections: key pair is stored in secrets_controller + // BEFORE the catalog transaction, so no action needed here. + ConnectionDetails::Ssh { .. } => {} + // AWS PrivateLink connections: create the VPC endpoint + ConnectionDetails::AwsPrivatelink(privatelink) => { + let spec = VpcEndpointConfig { + aws_service_name: privatelink.service_name.to_owned(), + availability_zone_ids: privatelink.availability_zones.to_owned(), + }; + vpc_endpoints_to_create.push((catalog_id, spec)); + } + // Other connection types don't require post-transaction actions + _ => {} + } } CatalogImplication::Connection(CatalogImplicationKind::Altered { - prev: prev_connection, - new: new_connection, + prev: _prev_connection, + new: _new_connection, }) => { - tracing::debug!( - ?prev_connection, - ?new_connection, - "not handling AlterConnection in here yet" - ); + // Connection alterations (like rotate keys) are handled via + // secrets_controller without catalog changes to the connection + // details structure, so no action needed here. } CatalogImplication::Connection(CatalogImplicationKind::Dropped( connection, @@ -519,6 +533,24 @@ impl Coordinator { self.initialize_storage_collections(storage_policies_to_initialize) .await?; + // Create VPC endpoints for AWS PrivateLink connections + if !vpc_endpoints_to_create.is_empty() { + if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() { + for (connection_id, spec) in vpc_endpoints_to_create { + if let Err(err) = cloud_resource_controller + .ensure_vpc_endpoint(connection_id, spec) + .await + { + tracing::warn!(?err, "failed to ensure vpc endpoint!"); + } + } + } else { + tracing::warn!( + "AWS PrivateLink connections unsupported without cloud_resource_controller" + ); + } + } + let collections_to_drop: BTreeSet<_> = sources_to_drop .iter() .map(|(_, gid)| *gid) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ed7a427165db3..6bc9c53a71060 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -785,34 +785,11 @@ impl Coordinator { owner_id: *ctx.session().current_role_id(), }]; + // VPC endpoint creation for AWS PrivateLink connections is now handled + // in apply_catalog_implications. + let conn_id = ctx.session().conn_id().clone(); let transact_result = self - .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| { - Box::pin(async move { - match plan.connection.details { - ConnectionDetails::AwsPrivatelink(ref privatelink) => { - let spec = VpcEndpointConfig { - aws_service_name: privatelink.service_name.to_owned(), - availability_zone_ids: privatelink.availability_zones.to_owned(), - }; - let cloud_resource_controller = - match coord.cloud_resource_controller.as_ref().cloned() { - Some(controller) => controller, - None => { - tracing::warn!("AWS PrivateLink connections unsupported"); - return; - } - }; - if let Err(err) = cloud_resource_controller - .ensure_vpc_endpoint(connection_id, spec) - .await - { - tracing::warn!(?err, "failed to ensure vpc endpoint!"); - } - } - _ => {} - } - }) - }) + .catalog_transact_with_context(Some(&conn_id), Some(ctx), ops) .await; match transact_result {