Skip to content

Commit 699cfdf

Browse files
committed
storage: allow sinks on multi-replica clusters
This PR extends the approach of MaterializeInc#31890 to the case of sinks. The handling is identical to that of single replica sources. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent ba8427b commit 699cfdf

File tree

3 files changed

+214
-96
lines changed

3 files changed

+214
-96
lines changed

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,14 +1311,6 @@ impl Coordinator {
13111311
let (item_id, global_id) =
13121312
return_if_err!(self.catalog_mut().allocate_user_id(id_ts).await, ctx);
13131313

1314-
if let Some(cluster) = self.catalog().try_get_cluster(in_cluster) {
1315-
mz_ore::soft_assert_or_log!(
1316-
cluster.replica_ids().len() <= 1,
1317-
"cannot create sink in cluster {}; has >1 replicas",
1318-
cluster.id()
1319-
);
1320-
}
1321-
13221314
let catalog_sink = Sink {
13231315
create_sql: sink.create_sql,
13241316
global_id,

src/sql/src/plan/statement/ddl.rs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3416,9 +3416,6 @@ fn plan_sink(
34163416
// `in_cluster` value we plan to normalize when we canonicalize the create
34173417
// statement.
34183418
let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3419-
if in_cluster.replica_ids().len() > 1 {
3420-
sql_bail!("cannot create sink in cluster with more than one replica")
3421-
}
34223419
let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
34233420

34243421
Ok(Plan::CreateSink(CreateSinkPlan {
@@ -5317,23 +5314,19 @@ fn plan_drop_network_policy(
53175314
/// Returns `true` if the cluster has any object that requires a single replica.
53185315
/// Returns `false` if the cluster has no objects.
53195316
fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5320-
cluster.bound_objects().iter().any(|id| {
5321-
let item = scx.catalog.get_item(id);
5322-
let single_replica_source = match item.source_desc() {
5323-
Ok(Some(desc)) => match desc.connection {
5324-
GenericSourceConnection::Kafka(_)
5325-
| GenericSourceConnection::LoadGenerator(_)
5326-
| GenericSourceConnection::MySql(_)
5327-
| GenericSourceConnection::Postgres(_) => {
5328-
let enable_multi_replica_sources =
5329-
ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
5330-
!enable_multi_replica_sources
5331-
}
5332-
},
5333-
_ => false,
5334-
};
5335-
single_replica_source || matches!(item.item_type(), CatalogItemType::Sink)
5336-
})
5317+
// If this feature is enabled then all objects support multiple-replicas
5318+
if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5319+
false
5320+
} else {
5321+
// Othewise we check for the existence of sources or sinks
5322+
cluster.bound_objects().iter().any(|id| {
5323+
let item = scx.catalog.get_item(id);
5324+
matches!(
5325+
item.item_type(),
5326+
CatalogItemType::Sink | CatalogItemType::Source
5327+
)
5328+
})
5329+
}
53375330
}
53385331

53395332
fn plan_drop_cluster_replica(

0 commit comments

Comments
 (0)