Skip to content

Commit d584a05

Browse files
committed
Add BootstrapBuiltinClusterConfig to support configurable cluster replication factor
This change allows configurable replication factors for builtin clusters during bootstrap. This will be useful for disabling certain clusters for self managed while also not breaking any of our test infra.
1 parent 109406c commit d584a05

File tree

14 files changed

+263
-138
lines changed

14 files changed

+263
-138
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
//! Types for bootstrap builtin cluster configuration.
11+
12+
#[derive(Debug, Clone)]
13+
pub struct BootstrapBuiltinClusterConfig {
14+
pub size: String,
15+
pub replication_factor: u32,
16+
}

src/adapter-types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
//! Types for the adapter.
1111
12+
pub mod bootstrap_builtin_cluster_config;
1213
pub mod compaction;
1314
pub mod connection;
1415
pub mod dyncfgs;

src/adapter/src/catalog.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919
use futures::future::BoxFuture;
2020
use futures::{Future, FutureExt};
2121
use itertools::Itertools;
22+
use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
2223
use mz_adapter_types::connection::ConnectionId;
2324
use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
2425
use mz_build_info::DUMMY_BUILD_INFO;
@@ -669,6 +670,7 @@ impl Catalog {
669670
// debugging/testing.
670671
let previous_ts = now().into();
671672
let replica_size = &bootstrap_args.default_cluster_replica_size;
673+
let default_replication_factor = 1;
672674
let read_only = false;
673675
let OpenCatalogResult {
674676
catalog,
@@ -690,11 +692,26 @@ impl Catalog {
690692
boot_ts: previous_ts,
691693
skip_migrations: true,
692694
cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
693-
builtin_system_cluster_replica_size: replica_size.clone(),
694-
builtin_catalog_server_cluster_replica_size: replica_size.clone(),
695-
builtin_probe_cluster_replica_size: replica_size.clone(),
696-
builtin_support_cluster_replica_size: replica_size.clone(),
697-
builtin_analytics_cluster_replica_size: replica_size.clone(),
695+
builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
696+
size: replica_size.clone(),
697+
replication_factor: default_replication_factor,
698+
},
699+
builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
700+
size: replica_size.clone(),
701+
replication_factor: default_replication_factor,
702+
},
703+
builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
704+
size: replica_size.clone(),
705+
replication_factor: default_replication_factor,
706+
},
707+
builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
708+
size: replica_size.clone(),
709+
replication_factor: default_replication_factor,
710+
},
711+
builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
712+
size: replica_size.clone(),
713+
replication_factor: default_replication_factor,
714+
},
698715
system_parameter_defaults,
699716
remote_system_parameters: None,
700717
availability_zones: vec![],

src/adapter/src/catalog/open.rs

Lines changed: 60 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::time::{Duration, Instant};
1717

1818
use futures::future::{BoxFuture, FutureExt};
1919
use itertools::{Either, Itertools};
20+
use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
2021
use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
2122
use mz_catalog::builtin::{
2223
Builtin, Fingerprint, BUILTINS, BUILTIN_CLUSTERS, BUILTIN_CLUSTER_REPLICAS, BUILTIN_PREFIXES,
@@ -200,22 +201,22 @@ impl Catalog {
200201
// Add any new builtin objects and remove old ones.
201202
let (migrated_builtins, new_builtin_collections) =
202203
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
203-
let cluster_sizes = BuiltinBootstrapClusterSizes {
204-
system_cluster: config.builtin_system_cluster_replica_size,
205-
catalog_server_cluster: config.builtin_catalog_server_cluster_replica_size,
206-
probe_cluster: config.builtin_probe_cluster_replica_size,
207-
support_cluster: config.builtin_support_cluster_replica_size,
208-
analytics_cluster: config.builtin_analytics_cluster_replica_size,
204+
let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
205+
system_cluster: config.builtin_system_cluster_config,
206+
catalog_server_cluster: config.builtin_catalog_server_cluster_config,
207+
probe_cluster: config.builtin_probe_cluster_config,
208+
support_cluster: config.builtin_support_cluster_config,
209+
analytics_cluster: config.builtin_analytics_cluster_config,
209210
};
210211
add_new_remove_old_builtin_clusters_migration(
211212
&mut txn,
212-
&cluster_sizes,
213+
&builtin_bootstrap_cluster_config_map,
213214
&state.cluster_replica_sizes,
214215
)?;
215216
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
216217
add_new_remove_old_builtin_cluster_replicas_migration(
217218
&mut txn,
218-
&cluster_sizes,
219+
&builtin_bootstrap_cluster_config_map,
219220
&state.cluster_replica_sizes,
220221
)?;
221222
add_new_remove_old_builtin_roles_migration(&mut txn)?;
@@ -855,7 +856,7 @@ fn add_new_remove_old_builtin_items_migration(
855856

856857
fn add_new_remove_old_builtin_clusters_migration(
857858
txn: &mut mz_catalog::durable::Transaction<'_>,
858-
builtin_cluster_sizes: &BuiltinBootstrapClusterSizes,
859+
builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
859860
cluster_sizes: &ClusterReplicaSizeMap,
860861
) -> Result<(), mz_catalog::durable::CatalogError> {
861862
let mut durable_clusters: BTreeMap<_, _> = txn
@@ -867,18 +868,19 @@ fn add_new_remove_old_builtin_clusters_migration(
867868
// Add new clusters.
868869
for builtin_cluster in BUILTIN_CLUSTERS {
869870
if durable_clusters.remove(builtin_cluster.name).is_none() {
870-
let cluster_size = builtin_cluster_sizes.get_size(builtin_cluster.name)?;
871-
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_size)?;
871+
let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
872+
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_config.size)?;
873+
872874
txn.insert_system_cluster(
873875
builtin_cluster.name,
874876
vec![],
875877
builtin_cluster.privileges.to_vec(),
876878
builtin_cluster.owner_id.to_owned(),
877879
mz_catalog::durable::ClusterConfig {
878880
variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
879-
size: cluster_size,
881+
size: cluster_config.size,
880882
availability_zones: vec![],
881-
replication_factor: builtin_cluster.replication_factor,
883+
replication_factor: cluster_config.replication_factor,
882884
disk: cluster_allocation.is_cc,
883885
logging: default_logging_config(),
884886
optimizer_feature_overrides: Default::default(),
@@ -968,7 +970,7 @@ fn add_new_remove_old_builtin_roles_migration(
968970

969971
fn add_new_remove_old_builtin_cluster_replicas_migration(
970972
txn: &mut Transaction<'_>,
971-
builtin_cluster_sizes: &BuiltinBootstrapClusterSizes,
973+
builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
972974
cluster_sizes: &ClusterReplicaSizeMap,
973975
) -> Result<(), AdapterError> {
974976
let cluster_lookup: BTreeMap<_, _> = txn
@@ -996,12 +998,18 @@ fn add_new_remove_old_builtin_cluster_replicas_migration(
996998
let replica_names = durable_replicas
997999
.get_mut(&cluster.id)
9981000
.unwrap_or(&mut empty_map);
999-
if replica_names.remove(builtin_replica.name).is_none() {
1001+
1002+
let builtin_cluster_boostrap_config =
1003+
builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1004+
if replica_names.remove(builtin_replica.name).is_none()
1005+
// NOTE(SangJunBak): We need to explicitly check the replication factor because
1006+
// BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
1007+
// factor is configurable on bootstrap.
1008+
&& builtin_cluster_boostrap_config.replication_factor > 0
1009+
{
10001010
let replica_size = match cluster.config.variant {
10011011
ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1002-
ClusterVariant::Unmanaged => {
1003-
builtin_cluster_sizes.get_size(builtin_replica.cluster_name)?
1004-
}
1012+
ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
10051013
};
10061014
let replica_allocation = cluster_sizes.get_allocation_by_name(&replica_size)?;
10071015

@@ -1115,37 +1123,44 @@ fn default_logging_config() -> ReplicaLogging {
11151123
interval: Some(Duration::from_secs(1)),
11161124
}
11171125
}
1118-
pub struct BuiltinBootstrapClusterSizes {
1119-
/// Size to default system_cluster on bootstrap
1120-
pub system_cluster: String,
1121-
/// Size to default catalog_server_cluster on bootstrap
1122-
pub catalog_server_cluster: String,
1123-
/// Size to default probe_cluster on bootstrap
1124-
pub probe_cluster: String,
1125-
/// Size to default support_cluster on bootstrap
1126-
pub support_cluster: String,
1126+
1127+
#[derive(Debug)]
1128+
pub struct BuiltinBootstrapClusterConfigMap {
1129+
/// Size and replication factor to default system_cluster on bootstrap
1130+
pub system_cluster: BootstrapBuiltinClusterConfig,
1131+
/// Size and replication factor to default catalog_server_cluster on bootstrap
1132+
pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1133+
/// Size and replication factor to default probe_cluster on bootstrap
1134+
pub probe_cluster: BootstrapBuiltinClusterConfig,
1135+
/// Size and replication factor to default support_cluster on bootstrap
1136+
pub support_cluster: BootstrapBuiltinClusterConfig,
11271137
/// Size to default analytics_cluster on bootstrap
1128-
pub analytics_cluster: String,
1138+
pub analytics_cluster: BootstrapBuiltinClusterConfig,
11291139
}
11301140

1131-
impl BuiltinBootstrapClusterSizes {
1141+
impl BuiltinBootstrapClusterConfigMap {
11321142
/// Gets the size of the builtin cluster based on the provided name
1133-
fn get_size(&self, cluster_name: &str) -> Result<String, mz_catalog::durable::CatalogError> {
1134-
if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1135-
Ok(self.system_cluster.clone())
1136-
} else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1137-
Ok(self.catalog_server_cluster.clone())
1138-
} else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1139-
Ok(self.probe_cluster.clone())
1140-
} else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1141-
Ok(self.support_cluster.clone())
1142-
} else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1143-
Ok(self.analytics_cluster.clone())
1144-
} else {
1145-
Err(mz_catalog::durable::CatalogError::Catalog(
1146-
SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1147-
))
1148-
}
1143+
fn get_config(
1144+
&self,
1145+
cluster_name: &str,
1146+
) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1147+
let cluster_config = match cluster_name {
1148+
name if name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name => &self.system_cluster,
1149+
name if name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name => {
1150+
&self.catalog_server_cluster
1151+
}
1152+
name if name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name => &self.probe_cluster,
1153+
name if name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name => &self.support_cluster,
1154+
name if name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name => {
1155+
&self.analytics_cluster
1156+
}
1157+
_ => {
1158+
return Err(mz_catalog::durable::CatalogError::Catalog(
1159+
SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1160+
))
1161+
}
1162+
};
1163+
Ok(cluster_config.clone())
11491164
}
11501165
}
11511166

src/adapter/src/coord.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ use futures::StreamExt;
8888
use http::Uri;
8989
use ipnet::IpNet;
9090
use itertools::{Either, Itertools};
91+
use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
9192
use mz_adapter_types::compaction::CompactionWindow;
9293
use mz_adapter_types::connection::ConnectionId;
9394
use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
@@ -998,11 +999,11 @@ pub struct Config {
998999
pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
9991000
pub availability_zones: Vec<String>,
10001001
pub cluster_replica_sizes: ClusterReplicaSizeMap,
1001-
pub builtin_system_cluster_replica_size: String,
1002-
pub builtin_catalog_server_cluster_replica_size: String,
1003-
pub builtin_probe_cluster_replica_size: String,
1004-
pub builtin_support_cluster_replica_size: String,
1005-
pub builtin_analytics_cluster_replica_size: String,
1002+
pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
1003+
pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
1004+
pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
1005+
pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
1006+
pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
10061007
pub system_parameter_defaults: BTreeMap<String, String>,
10071008
pub storage_usage_client: StorageUsageClient,
10081009
pub storage_usage_collection_interval: Duration,
@@ -3832,11 +3833,11 @@ pub fn serve(
38323833
secrets_controller,
38333834
cloud_resource_controller,
38343835
cluster_replica_sizes,
3835-
builtin_system_cluster_replica_size,
3836-
builtin_catalog_server_cluster_replica_size,
3837-
builtin_probe_cluster_replica_size,
3838-
builtin_support_cluster_replica_size,
3839-
builtin_analytics_cluster_replica_size,
3836+
builtin_system_cluster_config,
3837+
builtin_catalog_server_cluster_config,
3838+
builtin_probe_cluster_config,
3839+
builtin_support_cluster_config,
3840+
builtin_analytics_cluster_config,
38403841
system_parameter_defaults,
38413842
availability_zones,
38423843
storage_usage_client,
@@ -3986,11 +3987,11 @@ pub fn serve(
39863987
boot_ts: boot_ts.clone(),
39873988
skip_migrations: false,
39883989
cluster_replica_sizes,
3989-
builtin_system_cluster_replica_size,
3990-
builtin_catalog_server_cluster_replica_size,
3991-
builtin_probe_cluster_replica_size,
3992-
builtin_support_cluster_replica_size,
3993-
builtin_analytics_cluster_replica_size,
3990+
builtin_system_cluster_config,
3991+
builtin_catalog_server_cluster_config,
3992+
builtin_probe_cluster_config,
3993+
builtin_support_cluster_config,
3994+
builtin_analytics_cluster_config,
39943995
system_parameter_defaults,
39953996
remote_system_parameters,
39963997
availability_zones,

src/catalog-debug/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ rust_binary(
3232
version = "0.133.0-dev.0",
3333
deps = [
3434
"//src/adapter:mz_adapter",
35+
"//src/adapter-types:mz_adapter_types",
3536
"//src/build-info:mz_build_info",
3637
"//src/catalog:mz_catalog",
3738
"//src/cloud-resources:mz_cloud_resources",

src/catalog-debug/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ anyhow = "1.0.95"
1414
clap = { version = "4.5.23", features = ["derive", "env"] }
1515
futures = "0.3.25"
1616
mz-adapter = { path = "../adapter" }
17+
mz-adapter-types = { path = "../adapter-types" }
1718
mz-build-info = { path = "../build-info" }
1819
mz-catalog = { path = "../catalog" }
1920
mz-cloud-resources = { path = "../cloud-resources" }

src/catalog-debug/src/main.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use anyhow::Context;
2323
use clap::Parser;
2424
use futures::future::FutureExt;
2525
use mz_adapter::catalog::{Catalog, InitializeStateResult};
26+
use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
2627
use mz_build_info::{build_info, BuildInfo};
2728
use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, StateConfig};
2829
use mz_catalog::durable::debug::{
@@ -565,6 +566,8 @@ async fn upgrade_check(
565566
.0
566567
.clone();
567568

569+
let builtin_clusters_default_replication_factor = 1;
570+
568571
let boot_ts = now().into();
569572
let read_only = true;
570573
// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
@@ -590,11 +593,26 @@ async fn upgrade_check(
590593
boot_ts,
591594
skip_migrations: false,
592595
cluster_replica_sizes,
593-
builtin_system_cluster_replica_size: builtin_clusters_replica_size.clone(),
594-
builtin_catalog_server_cluster_replica_size: builtin_clusters_replica_size.clone(),
595-
builtin_probe_cluster_replica_size: builtin_clusters_replica_size.clone(),
596-
builtin_support_cluster_replica_size: builtin_clusters_replica_size.clone(),
597-
builtin_analytics_cluster_replica_size: builtin_clusters_replica_size,
596+
builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
597+
size: builtin_clusters_replica_size.clone(),
598+
replication_factor: builtin_clusters_default_replication_factor,
599+
},
600+
builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
601+
size: builtin_clusters_replica_size.clone(),
602+
replication_factor: builtin_clusters_default_replication_factor,
603+
},
604+
builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
605+
size: builtin_clusters_replica_size.clone(),
606+
replication_factor: builtin_clusters_default_replication_factor,
607+
},
608+
builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
609+
size: builtin_clusters_replica_size.clone(),
610+
replication_factor: builtin_clusters_default_replication_factor,
611+
},
612+
builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
613+
size: builtin_clusters_replica_size.clone(),
614+
replication_factor: builtin_clusters_default_replication_factor,
615+
},
598616
system_parameter_defaults: Default::default(),
599617
remote_system_parameters: None,
600618
availability_zones: vec![],

0 commit comments

Comments
 (0)