Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions src/adapter/src/coord/catalog_implications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mz_catalog::memory::objects::{
CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
};
use mz_cloud_resources::VpcEndpointConfig;
use mz_compute_client::protocol::response::PeekResponse;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::collections::CollectionExt;
Expand Down Expand Up @@ -181,6 +182,7 @@ impl Coordinator {
let mut source_collections_to_create = BTreeMap::new();
let mut storage_policies_to_initialize = BTreeMap::new();
let mut execution_timestamps_to_set = BTreeSet::new();
let mut vpc_endpoints_to_create: Vec<(CatalogItemId, VpcEndpointConfig)> = vec![];

// We're incrementally migrating the code that manipulates the
// controller from closures in the sequencer. For some types of catalog
Expand Down Expand Up @@ -380,17 +382,29 @@ impl Coordinator {
secrets_to_drop.push(catalog_id);
}
CatalogImplication::Connection(CatalogImplicationKind::Added(connection)) => {
tracing::debug!(?connection, "not handling AddConnection in here yet");
match &connection.details {
// SSH connections: key pair is stored in secrets_controller
// BEFORE the catalog transaction, so no action needed here.
ConnectionDetails::Ssh { .. } => {}
// AWS PrivateLink connections: create the VPC endpoint
ConnectionDetails::AwsPrivatelink(privatelink) => {
let spec = VpcEndpointConfig {
aws_service_name: privatelink.service_name.to_owned(),
availability_zone_ids: privatelink.availability_zones.to_owned(),
};
vpc_endpoints_to_create.push((catalog_id, spec));
}
// Other connection types don't require post-transaction actions
_ => {}
}
}
CatalogImplication::Connection(CatalogImplicationKind::Altered {
prev: prev_connection,
new: new_connection,
prev: _prev_connection,
new: _new_connection,
}) => {
tracing::debug!(
?prev_connection,
?new_connection,
"not handling AlterConnection in here yet"
);
// Connection alterations (like rotate keys) are handled via
// secrets_controller without catalog changes to the connection
// details structure, so no action needed here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with these things, but I see a ConnectionDetails::AwsPrivatelink code block in sequence_alter_connection_stage_finish, which does stuff with the cloud_resource_controller. (It seems similar to the migrated code block from sequence_create_connection_stage_finish.) Should that be also migrated here, similar to vpc_endpoints_to_create?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the main difference seems to be that the code "transaction side effects" are not included in a catalog_transact_with_side_effects call, but are run after the transaction. Which... I think is functionally the same?

}
CatalogImplication::Connection(CatalogImplicationKind::Dropped(
connection,
Expand Down Expand Up @@ -519,6 +533,24 @@ impl Coordinator {
self.initialize_storage_collections(storage_policies_to_initialize)
.await?;

// Create VPC endpoints for AWS PrivateLink connections
if !vpc_endpoints_to_create.is_empty() {
if let Some(cloud_resource_controller) = self.cloud_resource_controller.as_ref() {
for (connection_id, spec) in vpc_endpoints_to_create {
if let Err(err) = cloud_resource_controller
.ensure_vpc_endpoint(connection_id, spec)
.await
{
tracing::warn!(?err, "failed to ensure vpc endpoint!");
}
}
} else {
tracing::warn!(
Comment on lines +544 to +548
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know both of these are warns in the original code as well, but I'm wondering if we shouldn't make them errors instead? Seems like we should complain pretty loudly if we fail here, since we've already reported to the user that the command was successful.

"AWS PrivateLink connections unsupported without cloud_resource_controller"
);
}
}

let collections_to_drop: BTreeSet<_> = sources_to_drop
.iter()
.map(|(_, gid)| *gid)
Expand Down
31 changes: 4 additions & 27 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,34 +785,11 @@ impl Coordinator {
owner_id: *ctx.session().current_role_id(),
}];

// VPC endpoint creation for AWS PrivateLink connections is now handled
// in apply_catalog_implications.
let conn_id = ctx.session().conn_id().clone();
let transact_result = self
.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
Box::pin(async move {
match plan.connection.details {
ConnectionDetails::AwsPrivatelink(ref privatelink) => {
let spec = VpcEndpointConfig {
aws_service_name: privatelink.service_name.to_owned(),
availability_zone_ids: privatelink.availability_zones.to_owned(),
};
let cloud_resource_controller =
match coord.cloud_resource_controller.as_ref().cloned() {
Some(controller) => controller,
None => {
tracing::warn!("AWS PrivateLink connections unsupported");
return;
}
};
if let Err(err) = cloud_resource_controller
.ensure_vpc_endpoint(connection_id, spec)
.await
{
tracing::warn!(?err, "failed to ensure vpc endpoint!");
}
}
_ => {}
}
})
})
.catalog_transact_with_context(Some(&conn_id), Some(ctx), ops)
.await;

match transact_result {
Expand Down