Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions misc/helm-charts/operator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ The following table lists the configurable parameters of the Materialize operato
| `operator.cloudProvider.providers.gcp` | GCP Configuration (placeholder for future use) | ``{"enabled":false}`` |
| `operator.cloudProvider.region` | Common cloud provider settings | ``"kind"`` |
| `operator.cloudProvider.type` | Specifies cloud provider. Valid values are 'aws', 'gcp', 'azure' , 'generic', or 'local' | ``"local"`` |
| `operator.clusters.defaultReplicationFactor.analytics` | | ``0`` |
| `operator.clusters.defaultReplicationFactor.probe` | | ``0`` |
| `operator.clusters.defaultReplicationFactor.support` | | ``0`` |
| `operator.clusters.defaultReplicationFactor.system` | | ``0`` |
| `operator.clusters.defaultSizes.analytics` | | ``"25cc"`` |
| `operator.clusters.defaultSizes.catalogServer` | | ``"50cc"`` |
| `operator.clusters.defaultSizes.default` | | ``"25cc"`` |
Expand Down
12 changes: 12 additions & 0 deletions misc/helm-charts/operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ spec:
{{ if .Values.operator.clusters.defaultSizes.analytics }}
- "--bootstrap-builtin-analytics-cluster-replica-size={{ .Values.operator.clusters.defaultSizes.analytics }}"
{{- end }}
{{ if ne .Values.operator.clusters.defaultReplicationFactor.system nil }}
- "--bootstrap-builtin-system-cluster-replication-factor={{ .Values.operator.clusters.defaultReplicationFactor.system }}"
{{- end }}
{{ if ne .Values.operator.clusters.defaultReplicationFactor.probe nil }}
- "--bootstrap-builtin-probe-cluster-replication-factor={{ .Values.operator.clusters.defaultReplicationFactor.probe }}"
{{- end }}
{{ if ne .Values.operator.clusters.defaultReplicationFactor.support nil }}
- "--bootstrap-builtin-support-cluster-replication-factor={{ .Values.operator.clusters.defaultReplicationFactor.support }}"
{{- end }}
{{ if ne .Values.operator.clusters.defaultReplicationFactor.analytics nil }}
- "--bootstrap-builtin-analytics-cluster-replication-factor={{ .Values.operator.clusters.defaultReplicationFactor.analytics }}"
{{- end }}
{{- end }}
- "--image-pull-policy={{ kebabcase .Values.operator.image.pullPolicy }}"
{{- range $key, $value := .Values.environmentd.nodeSelector }}
Expand Down
5 changes: 5 additions & 0 deletions misc/helm-charts/operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ operator:
support: 25cc
catalogServer: 50cc
analytics: 25cc
defaultReplicationFactor:
system: 0
probe: 0
support: 0
analytics: 0

# Node selector to use for the operator pod
nodeSelector: {}
Expand Down
22 changes: 22 additions & 0 deletions src/adapter-types/src/bootstrap_builtin_cluster_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Types for bootstrap builtin cluster configuration.

#[derive(Debug, Clone)]
pub struct BootstrapBuiltinClusterConfig {
pub size: String,
pub replication_factor: u32,
}

pub const SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR: u32 = 1;
pub const CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR: u32 = 1;
pub const PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR: u32 = 1;
pub const SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR: u32 = 0;
pub const ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR: u32 = 0;
Comment on lines +23 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're here, can you add a comment explaining why these have a default of 0? e.g. that they are ephemeral clusters we spin up only to scrape analytics or for support when debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm!

1 change: 1 addition & 0 deletions src/adapter-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

//! Types for the adapter.
pub mod bootstrap_builtin_cluster_config;
pub mod compaction;
pub mod connection;
pub mod dyncfgs;
Expand Down
30 changes: 25 additions & 5 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ use std::sync::Arc;
use futures::future::BoxFuture;
use futures::{Future, FutureExt};
use itertools::Itertools;
use mz_adapter_types::bootstrap_builtin_cluster_config::{
BootstrapBuiltinClusterConfig, ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
};
use mz_adapter_types::connection::ConnectionId;
use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
use mz_build_info::DUMMY_BUILD_INFO;
Expand Down Expand Up @@ -690,11 +695,26 @@ impl Catalog {
boot_ts: previous_ts,
skip_migrations: true,
cluster_replica_sizes: bootstrap_args.cluster_replica_size_map.clone(),
builtin_system_cluster_replica_size: replica_size.clone(),
builtin_catalog_server_cluster_replica_size: replica_size.clone(),
builtin_probe_cluster_replica_size: replica_size.clone(),
builtin_support_cluster_replica_size: replica_size.clone(),
builtin_analytics_cluster_replica_size: replica_size.clone(),
builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
size: replica_size.clone(),
replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
},
builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
size: replica_size.clone(),
replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
},
builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
size: replica_size.clone(),
replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
},
builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
size: replica_size.clone(),
replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
},
builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
size: replica_size.clone(),
replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
},
Comment on lines +698 to +717
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying these 5 fields all the time feels brittle, e.g. easy to mix up the config values for two different clusters. A fix might be to use newtypes, e.g.

pub struct SystemClusterReplicationFactor(usize);
pub struct CatalogServerClusterReplicationFactor(usize);
...

But that seems quite tedious too. If you feel inspired thinking about how we can make this more succinct might be nice, but definitely not blocking!

Copy link
Contributor Author

@SangJunBak SangJunBak Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying these 5 fields all the time feels brittle

hmm my Rust knowledge is kinda capped here! Like the cluster_config variables ? Would that require a new BootstrapBuiltinClusterConfig struct per <builtin cluster>ClusterReplicationFactor(usize)? Or could you do something like:

... 
   builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
                    size: replica_size.clone(),
                    replication_factor: AnalyticsClusterReplicationFactor (ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR),
                },

If we had to make a new ...ClusterConfig struct per builtin cluster, i kinda feel like it makes each config less generic which might be bad? Might be helpful to go over this in person since I'm genuinely curious!

system_parameter_defaults,
remote_system_parameters: None,
availability_zones: vec![],
Expand Down
105 changes: 60 additions & 45 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::{Duration, Instant};

use futures::future::{BoxFuture, FutureExt};
use itertools::{Either, Itertools};
use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
use mz_catalog::builtin::{
Builtin, Fingerprint, BUILTINS, BUILTIN_CLUSTERS, BUILTIN_CLUSTER_REPLICAS, BUILTIN_PREFIXES,
Expand Down Expand Up @@ -200,22 +201,22 @@ impl Catalog {
// Add any new builtin objects and remove old ones.
let (migrated_builtins, new_builtin_collections) =
add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
let cluster_sizes = BuiltinBootstrapClusterSizes {
system_cluster: config.builtin_system_cluster_replica_size,
catalog_server_cluster: config.builtin_catalog_server_cluster_replica_size,
probe_cluster: config.builtin_probe_cluster_replica_size,
support_cluster: config.builtin_support_cluster_replica_size,
analytics_cluster: config.builtin_analytics_cluster_replica_size,
let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
system_cluster: config.builtin_system_cluster_config,
catalog_server_cluster: config.builtin_catalog_server_cluster_config,
probe_cluster: config.builtin_probe_cluster_config,
support_cluster: config.builtin_support_cluster_config,
analytics_cluster: config.builtin_analytics_cluster_config,
};
add_new_remove_old_builtin_clusters_migration(
&mut txn,
&cluster_sizes,
&builtin_bootstrap_cluster_config_map,
&state.cluster_replica_sizes,
)?;
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
add_new_remove_old_builtin_cluster_replicas_migration(
&mut txn,
&cluster_sizes,
&builtin_bootstrap_cluster_config_map,
&state.cluster_replica_sizes,
)?;
add_new_remove_old_builtin_roles_migration(&mut txn)?;
Expand Down Expand Up @@ -855,7 +856,7 @@ fn add_new_remove_old_builtin_items_migration(

fn add_new_remove_old_builtin_clusters_migration(
txn: &mut mz_catalog::durable::Transaction<'_>,
builtin_cluster_sizes: &BuiltinBootstrapClusterSizes,
builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
cluster_sizes: &ClusterReplicaSizeMap,
) -> Result<(), mz_catalog::durable::CatalogError> {
let mut durable_clusters: BTreeMap<_, _> = txn
Expand All @@ -867,18 +868,19 @@ fn add_new_remove_old_builtin_clusters_migration(
// Add new clusters.
for builtin_cluster in BUILTIN_CLUSTERS {
if durable_clusters.remove(builtin_cluster.name).is_none() {
let cluster_size = builtin_cluster_sizes.get_size(builtin_cluster.name)?;
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_size)?;
let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_config.size)?;

txn.insert_system_cluster(
builtin_cluster.name,
vec![],
builtin_cluster.privileges.to_vec(),
builtin_cluster.owner_id.to_owned(),
mz_catalog::durable::ClusterConfig {
variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
size: cluster_size,
size: cluster_config.size,
availability_zones: vec![],
replication_factor: builtin_cluster.replication_factor,
replication_factor: cluster_config.replication_factor,
disk: cluster_allocation.is_cc,
logging: default_logging_config(),
optimizer_feature_overrides: Default::default(),
Expand Down Expand Up @@ -968,7 +970,7 @@ fn add_new_remove_old_builtin_roles_migration(

fn add_new_remove_old_builtin_cluster_replicas_migration(
txn: &mut Transaction<'_>,
builtin_cluster_sizes: &BuiltinBootstrapClusterSizes,
builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
cluster_sizes: &ClusterReplicaSizeMap,
) -> Result<(), AdapterError> {
let cluster_lookup: BTreeMap<_, _> = txn
Expand Down Expand Up @@ -996,12 +998,18 @@ fn add_new_remove_old_builtin_cluster_replicas_migration(
let replica_names = durable_replicas
.get_mut(&cluster.id)
.unwrap_or(&mut empty_map);
if replica_names.remove(builtin_replica.name).is_none() {

let builtin_cluster_boostrap_config =
builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
if replica_names.remove(builtin_replica.name).is_none()
// NOTE(SangJunBak): We need to explicitly check the replication factor because
// BUILT_IN_CLUSTER_REPLICAS is constant throughout all deployments but the replication
// factor is configurable on bootstrap.
&& builtin_cluster_boostrap_config.replication_factor > 0
{
let replica_size = match cluster.config.variant {
ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
ClusterVariant::Unmanaged => {
builtin_cluster_sizes.get_size(builtin_replica.cluster_name)?
}
ClusterVariant::Unmanaged => builtin_cluster_boostrap_config.size,
};
let replica_allocation = cluster_sizes.get_allocation_by_name(&replica_size)?;

Expand Down Expand Up @@ -1115,37 +1123,44 @@ fn default_logging_config() -> ReplicaLogging {
interval: Some(Duration::from_secs(1)),
}
}
pub struct BuiltinBootstrapClusterSizes {
/// Size to default system_cluster on bootstrap
pub system_cluster: String,
/// Size to default catalog_server_cluster on bootstrap
pub catalog_server_cluster: String,
/// Size to default probe_cluster on bootstrap
pub probe_cluster: String,
/// Size to default support_cluster on bootstrap
pub support_cluster: String,

#[derive(Debug)]
pub struct BuiltinBootstrapClusterConfigMap {
/// Size and replication factor to default system_cluster on bootstrap
pub system_cluster: BootstrapBuiltinClusterConfig,
/// Size and replication factor to default catalog_server_cluster on bootstrap
pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
/// Size and replication factor to default probe_cluster on bootstrap
pub probe_cluster: BootstrapBuiltinClusterConfig,
/// Size and replication factor to default support_cluster on bootstrap
pub support_cluster: BootstrapBuiltinClusterConfig,
/// Size to default analytics_cluster on bootstrap
pub analytics_cluster: String,
pub analytics_cluster: BootstrapBuiltinClusterConfig,
}

impl BuiltinBootstrapClusterSizes {
impl BuiltinBootstrapClusterConfigMap {
/// Gets the size of the builtin cluster based on the provided name
fn get_size(&self, cluster_name: &str) -> Result<String, mz_catalog::durable::CatalogError> {
if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
Ok(self.system_cluster.clone())
} else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
Ok(self.catalog_server_cluster.clone())
} else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
Ok(self.probe_cluster.clone())
} else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
Ok(self.support_cluster.clone())
} else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
Ok(self.analytics_cluster.clone())
} else {
Err(mz_catalog::durable::CatalogError::Catalog(
SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
))
}
fn get_config(
&self,
cluster_name: &str,
) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
let cluster_config = match cluster_name {
name if name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name => &self.system_cluster,
name if name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name => {
&self.catalog_server_cluster
}
name if name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name => &self.probe_cluster,
name if name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name => &self.support_cluster,
name if name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name => {
&self.analytics_cluster
}
_ => {
return Err(mz_catalog::durable::CatalogError::Catalog(
SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
))
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it feels like this would be better written as if-else statements? If you want to prevent all of the Ok wrapping you can do:

let cluster_config = if cluster_name == "foo" {
    &self.foo_cluster
} else {
    return ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm!

Ok(cluster_config.clone())
}
}

Expand Down
31 changes: 16 additions & 15 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ use futures::StreamExt;
use http::Uri;
use ipnet::IpNet;
use itertools::{Either, Itertools};
use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
Expand Down Expand Up @@ -998,11 +999,11 @@ pub struct Config {
pub cloud_resource_controller: Option<Arc<dyn CloudResourceController>>,
pub availability_zones: Vec<String>,
pub cluster_replica_sizes: ClusterReplicaSizeMap,
pub builtin_system_cluster_replica_size: String,
pub builtin_catalog_server_cluster_replica_size: String,
pub builtin_probe_cluster_replica_size: String,
pub builtin_support_cluster_replica_size: String,
pub builtin_analytics_cluster_replica_size: String,
pub builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
pub builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
pub builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
pub builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
pub builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
pub system_parameter_defaults: BTreeMap<String, String>,
pub storage_usage_client: StorageUsageClient,
pub storage_usage_collection_interval: Duration,
Expand Down Expand Up @@ -3832,11 +3833,11 @@ pub fn serve(
secrets_controller,
cloud_resource_controller,
cluster_replica_sizes,
builtin_system_cluster_replica_size,
builtin_catalog_server_cluster_replica_size,
builtin_probe_cluster_replica_size,
builtin_support_cluster_replica_size,
builtin_analytics_cluster_replica_size,
builtin_system_cluster_config,
builtin_catalog_server_cluster_config,
builtin_probe_cluster_config,
builtin_support_cluster_config,
builtin_analytics_cluster_config,
system_parameter_defaults,
availability_zones,
storage_usage_client,
Expand Down Expand Up @@ -3986,11 +3987,11 @@ pub fn serve(
boot_ts: boot_ts.clone(),
skip_migrations: false,
cluster_replica_sizes,
builtin_system_cluster_replica_size,
builtin_catalog_server_cluster_replica_size,
builtin_probe_cluster_replica_size,
builtin_support_cluster_replica_size,
builtin_analytics_cluster_replica_size,
builtin_system_cluster_config,
builtin_catalog_server_cluster_config,
builtin_probe_cluster_config,
builtin_support_cluster_config,
builtin_analytics_cluster_config,
system_parameter_defaults,
remote_system_parameters,
availability_zones,
Expand Down
1 change: 1 addition & 0 deletions src/catalog-debug/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rust_binary(
version = "0.133.0-dev.0",
deps = [
"//src/adapter:mz_adapter",
"//src/adapter-types:mz_adapter_types",
"//src/build-info:mz_build_info",
"//src/catalog:mz_catalog",
"//src/cloud-resources:mz_cloud_resources",
Expand Down
1 change: 1 addition & 0 deletions src/catalog-debug/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ anyhow = "1.0.95"
clap = { version = "4.5.23", features = ["derive", "env"] }
futures = "0.3.25"
mz-adapter = { path = "../adapter" }
mz-adapter-types = { path = "../adapter-types" }
mz-build-info = { path = "../build-info" }
mz-catalog = { path = "../catalog" }
mz-cloud-resources = { path = "../cloud-resources" }
Expand Down
Loading