@@ -38,6 +38,7 @@ use mz_catalog::memory::objects::{
3838 CatalogItem , Cluster , ClusterReplica , Connection , ContinualTask , DataSourceDesc , Index ,
3939 MaterializedView , Secret , Sink , Source , StateDiff , Table , TableDataSource , View ,
4040} ;
41+ use mz_cloud_resources:: VpcEndpointConfig ;
4142use mz_compute_client:: protocol:: response:: PeekResponse ;
4243use mz_controller_types:: { ClusterId , ReplicaId } ;
4344use mz_ore:: collections:: CollectionExt ;
@@ -181,6 +182,7 @@ impl Coordinator {
181182 let mut source_collections_to_create = BTreeMap :: new ( ) ;
182183 let mut storage_policies_to_initialize = BTreeMap :: new ( ) ;
183184 let mut execution_timestamps_to_set = BTreeSet :: new ( ) ;
185+ let mut vpc_endpoints_to_create: Vec < ( CatalogItemId , VpcEndpointConfig ) > = vec ! [ ] ;
184186
185187 // We're incrementally migrating the code that manipulates the
186188 // controller from closures in the sequencer. For some types of catalog
@@ -380,17 +382,29 @@ impl Coordinator {
380382 secrets_to_drop. push ( catalog_id) ;
381383 }
382384 CatalogImplication :: Connection ( CatalogImplicationKind :: Added ( connection) ) => {
383- tracing:: debug!( ?connection, "not handling AddConnection in here yet" ) ;
385+ match & connection. details {
386+ // SSH connections: key pair is stored in secrets_controller
387+ // BEFORE the catalog transaction, so no action needed here.
388+ ConnectionDetails :: Ssh { .. } => { }
389+ // AWS PrivateLink connections: create the VPC endpoint
390+ ConnectionDetails :: AwsPrivatelink ( privatelink) => {
391+ let spec = VpcEndpointConfig {
392+ aws_service_name : privatelink. service_name . to_owned ( ) ,
393+ availability_zone_ids : privatelink. availability_zones . to_owned ( ) ,
394+ } ;
395+ vpc_endpoints_to_create. push ( ( catalog_id, spec) ) ;
396+ }
397+ // Other connection types don't require post-transaction actions
398+ _ => { }
399+ }
384400 }
385401 CatalogImplication :: Connection ( CatalogImplicationKind :: Altered {
386- prev : prev_connection ,
387- new : new_connection ,
402+ prev : _prev_connection ,
403+ new : _new_connection ,
388404 } ) => {
389- tracing:: debug!(
390- ?prev_connection,
391- ?new_connection,
392- "not handling AlterConnection in here yet"
393- ) ;
405+ // Connection alterations (like rotate keys) are handled via
406+ // secrets_controller without catalog changes to the connection
407+ // details structure, so no action needed here.
394408 }
395409 CatalogImplication :: Connection ( CatalogImplicationKind :: Dropped (
396410 connection,
@@ -519,6 +533,24 @@ impl Coordinator {
519533 self . initialize_storage_collections ( storage_policies_to_initialize)
520534 . await ?;
521535
536+ // Create VPC endpoints for AWS PrivateLink connections
537+ if !vpc_endpoints_to_create. is_empty ( ) {
538+ if let Some ( cloud_resource_controller) = self . cloud_resource_controller . as_ref ( ) {
539+ for ( connection_id, spec) in vpc_endpoints_to_create {
540+ if let Err ( err) = cloud_resource_controller
541+ . ensure_vpc_endpoint ( connection_id, spec)
542+ . await
543+ {
544+ tracing:: warn!( ?err, "failed to ensure vpc endpoint!" ) ;
545+ }
546+ }
547+ } else {
548+ tracing:: warn!(
549+ "AWS PrivateLink connections unsupported without cloud_resource_controller"
550+ ) ;
551+ }
552+ }
553+
522554 let collections_to_drop: BTreeSet < _ > = sources_to_drop
523555 . iter ( )
524556 . map ( |( _, gid) | * gid)
0 commit comments