Skip to content

Commit 804de79

Browse files
committed
storage: allow singleton source on multi-replica clusters, schedule on one replica only
Implements MaterializeInc/database-issues#9079 Per the design in https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20250127_multi_replica_scheduling_singleton_sources.md
1 parent 386c837 commit 804de79

File tree

9 files changed

+350
-48
lines changed

9 files changed

+350
-48
lines changed

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -311,25 +311,9 @@ impl Coordinator {
311311
.in_cluster
312312
.expect("ingestion plans must specify cluster");
313313
match ingestion.desc.connection {
314-
GenericSourceConnection::Postgres(_) => {
315-
if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
316-
if cluster.replica_ids().len() > 1 {
317-
return Err(AdapterError::Unsupported(
318-
"Postgres sources in clusters with >1 replicas",
319-
));
320-
}
321-
}
322-
}
323-
GenericSourceConnection::MySql(_) => {
324-
if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
325-
if cluster.replica_ids().len() > 1 {
326-
return Err(AdapterError::Unsupported(
327-
"MySQL sources in clusters with >1 replicas",
328-
));
329-
}
330-
}
331-
}
332-
GenericSourceConnection::Kafka(_)
314+
GenericSourceConnection::Postgres(_)
315+
| GenericSourceConnection::MySql(_)
316+
| GenericSourceConnection::Kafka(_)
333317
| GenericSourceConnection::LoadGenerator(_) => {
334318
if let Some(cluster) = self.catalog().try_get_cluster(cluster_id) {
335319
let enable_multi_replica_sources = ENABLE_MULTI_REPLICA_SOURCES

src/sql/src/plan/statement/ddl.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5319,12 +5319,14 @@ fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn Catalog
53195319
let item = scx.catalog.get_item(id);
53205320
let single_replica_source = match item.source_desc() {
53215321
Ok(Some(desc)) => match desc.connection {
5322-
GenericSourceConnection::Kafka(_) | GenericSourceConnection::LoadGenerator(_) => {
5322+
GenericSourceConnection::Kafka(_)
5323+
| GenericSourceConnection::LoadGenerator(_)
5324+
| GenericSourceConnection::MySql(_)
5325+
| GenericSourceConnection::Postgres(_) => {
53235326
let enable_multi_replica_sources =
53245327
ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
53255328
!enable_multi_replica_sources
53265329
}
5327-
GenericSourceConnection::MySql(_) | GenericSourceConnection::Postgres(_) => true,
53285330
},
53295331
_ => false,
53305332
};

src/storage-controller/src/history.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl<T: timely::progress::Timestamp + TotalOrder> CommandHistory<T> {
5050
}
5151

5252
/// Returns an iterator over the contained storage commands.
53-
pub fn iter(&self) -> impl Iterator<Item = &StorageCommand<T>> {
53+
pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand<T>> {
5454
self.commands.iter()
5555
}
5656

0 commit comments

Comments
 (0)