Skip to content

Commit 3833722

Browse files
committed
adapter: handle AddConnection via catalog implications
1 parent 2a01a42 commit 3833722

File tree

2 files changed

+42
-35
lines changed

2 files changed

+42
-35
lines changed

src/adapter/src/coord/catalog_implications.rs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use fail::fail_point;
3434
use itertools::Itertools;
3535
use mz_adapter_types::compaction::CompactionWindow;
3636
use mz_catalog::builtin;
37+
use mz_cloud_resources::VpcEndpointConfig;
3738
use mz_catalog::memory::objects::{
3839
CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
3940
MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
@@ -181,6 +182,7 @@ impl Coordinator {
181182
let mut source_collections_to_create = BTreeMap::new();
182183
let mut storage_policies_to_initialize = BTreeMap::new();
183184
let mut execution_timestamps_to_set = BTreeSet::new();
185+
let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];
184186

185187
// We're incrementally migrating the code that manipulates the
186188
// controller from closures in the sequencer. For some types of catalog
@@ -380,17 +382,29 @@ impl Coordinator {
380382
secrets_to_drop.push(catalog_id);
381383
}
382384
CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
383-
tracing::debug!(?connection, "not handling AddConnection in here yet");
385+
match &connection.details {
386+
// SSH connections: key pair is stored in secrets_controller
387+
// BEFORE the catalog transaction, so no action needed here.
388+
ConnectionDetails::Ssh { .. } => {}
389+
// AWS PrivateLink connections: create the VPC endpoint
390+
ConnectionDetails::AwsPrivatelink(privatelink) => {
391+
let spec = VpcEndpointConfig {
392+
aws_service_name: privatelink.service_name.to_owned(),
393+
availability_zone_ids: privatelink.availability_zones.to_owned(),
394+
};
395+
vpc_endpoints_to_create.push((catalog_id, spec));
396+
}
397+
// Other connection types don't require post-transaction actions
398+
_ => {}
399+
}
384400
}
385401
CatalogImplication::Connection(CatalogImplicationKind::Altered {
386-
prev: prev_connection,
387-
new: new_connection,
402+
prev: _prev_connection,
403+
new: _new_connection,
388404
}) => {
389-
tracing::debug!(
390-
?prev_connection,
391-
?new_connection,
392-
"not handling AlterConnection in here yet"
393-
);
405+
// Connection alterations (like rotate keys) are handled via
406+
// secrets_controller without catalog changes to the connection
407+
// details structure, so no action needed here.
394408
}
395409
CatalogImplication::Connection(CatalogImplicationKind::Dropped(
396410
connection,
@@ -519,6 +533,22 @@ impl Coordinator {
519533
self.initialize_storage_collections(storage_policies_to_initialize)
520534
.await?;
521535

536+
// Create VPC endpoints for AWS PrivateLink connections
537+
if !vpc_endpoints_to_create.is_empty() {
538+
if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
539+
for (connection_id, spec) in vpc_endpoints_to_create {
540+
if let Err(err) = cloud_resource_controller
541+
.ensure_vpc_endpoint(connection_id, spec)
542+
.await
543+
{
544+
tracing::warn!(?err, "failed to ensure vpc endpoint!");
545+
}
546+
}
547+
} else {
548+
tracing::warn!("AWS PrivateLink connections unsupported without cloud_resource_controller");
549+
}
550+
}
551+
522552
let collections_to_drop: BTreeSet<_> = sources_to_drop
523553
.iter()
524554
.map(|(_, gid)| *gid)

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -785,34 +785,11 @@ impl Coordinator {
785785
owner_id: *ctx.session().current_role_id(),
786786
}];
787787

788+
// VPC endpoint creation for AWS PrivateLink connections is now handled
789+
// in apply_catalog_implications.
790+
let conn_id = ctx.session().conn_id().clone();
788791
let transact_result = self
789-
.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
790-
Box::pin(async move {
791-
match plan.connection.details {
792-
ConnectionDetails::AwsPrivatelink(ref privatelink) => {
793-
let spec = VpcEndpointConfig {
794-
aws_service_name: privatelink.service_name.to_owned(),
795-
availability_zone_ids: privatelink.availability_zones.to_owned(),
796-
};
797-
let cloud_resource_controller =
798-
match coord.cloud_resource_controller.as_ref().cloned() {
799-
Some(controller) => controller,
800-
None => {
801-
tracing::warn!("AWS PrivateLink connections unsupported");
802-
return;
803-
}
804-
};
805-
if let Err(err) = cloud_resource_controller
806-
.ensure_vpc_endpoint(connection_id, spec)
807-
.await
808-
{
809-
tracing::warn!(?err, "failed to ensure vpc endpoint!");
810-
}
811-
}
812-
_ => {}
813-
}
814-
})
815-
})
792+
.catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
816793
.await;
817794

818795
match transact_result {

0 commit comments

Comments
 (0)