diff --git a/Cargo.lock b/Cargo.lock index bba7f7b8f4ee5..95863221109de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4457,17 +4457,17 @@ dependencies = [ [[package]] name = "k8s-controller" -version = "0.6.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "797c7fe6c6426d508eaf7f84fd80859b46d9b5a9fa49d3c31c7455669c599a1f" +checksum = "0b99ed10ec81d75006ca38992b59e254e980e619a24cf421c147f42868a0e804" dependencies = [ "async-trait", "futures", - "k8s-openapi", "kube", "kube-runtime", "rand 0.9.2", "serde", + "thiserror 2.0.17", "tracing", ] diff --git a/src/cloud-resources/src/crd.rs b/src/cloud-resources/src/crd.rs index a47006799cb35..503fde8169b3d 100644 --- a/src/cloud-resources/src/crd.rs +++ b/src/cloud-resources/src/crd.rs @@ -9,18 +9,26 @@ //! Kubernetes custom resources +use std::collections::BTreeMap; use std::time::Duration; use futures::future::join_all; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; -use kube::api::{Patch, PatchParams}; -use kube::{Api, Client}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; use kube::{ + Api, Client, Resource, ResourceExt, + api::{ObjectMeta, Patch, PatchParams}, core::crd::merge_crds, runtime::{conditions, wait::await_condition}, }; +use rand::{Rng, distr::Uniform}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use tracing::{info, warn}; +use crate::crd::generated::cert_manager::certificates::{ + CertificateIssuerRef, CertificateSecretTemplate, +}; use mz_ore::retry::Retry; pub mod generated; @@ -28,6 +36,55 @@ pub mod materialize; #[cfg(feature = "vpc-endpoints")] pub mod vpc_endpoint; +// This is intentionally a subset of the fields of a Certificate. +// We do not want customers to configure options that may conflict with +// things we override or expand in our code. +#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct MaterializeCertSpec { + /// Additional DNS names the certificate will be valid for. + pub dns_names: Option>, + /// Duration the certificate will be requested for. + /// Value must be in units accepted by Go + /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration). + pub duration: Option, + /// Duration before expiration the certificate will be renewed. + /// Value must be in units accepted by Go + /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration). + pub renew_before: Option, + /// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate. + pub issuer_ref: Option, + /// Additional annotations and labels to include in the Certificate object. + pub secret_template: Option, +} + +pub trait ManagedResource: Resource + Sized { + fn default_labels(&self) -> BTreeMap { + BTreeMap::new() + } + + fn managed_resource_meta(&self, name: String) -> ObjectMeta { + ObjectMeta { + namespace: Some(self.meta().namespace.clone().unwrap()), + name: Some(name), + labels: Some(self.default_labels()), + owner_references: Some(vec![owner_reference(self)]), + ..Default::default() + } + } +} + +fn owner_reference>(t: &T) -> OwnerReference { + OwnerReference { + api_version: T::api_version(&()).to_string(), + kind: T::kind(&()).to_string(), + name: t.name_unchecked(), + uid: t.uid().unwrap(), + block_owner_deletion: Some(true), + ..Default::default() + } +} + #[derive(Debug, Clone)] pub struct VersionedCrd { pub crds: Vec, @@ -99,3 +156,16 @@ async fn register_custom_resource( info!("Done registering {} crd", &crd_name); Ok(()) } + +pub fn new_resource_id() -> String { + // DNS-1035 names are supposed to be case insensitive, + // so we define our own character set, rather than use the + // built-in Alphanumeric distribution from rand, which + // includes both upper and lowercase letters. + const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789"; + rand::rng() + .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range")) + .take(10) + .map(|i| char::from(CHARSET[i])) + .collect() +} diff --git a/src/cloud-resources/src/crd/materialize.rs b/src/cloud-resources/src/crd/materialize.rs index 2c520923e6f0c..352825a040d0d 100644 --- a/src/cloud-resources/src/crd/materialize.rs +++ b/src/cloud-resources/src/crd/materialize.rs @@ -13,51 +13,24 @@ use k8s_openapi::{ api::core::v1::{EnvVar, ResourceRequirements}, apimachinery::pkg::{ api::resource::Quantity, - apis::meta::v1::{Condition, OwnerReference, Time}, + apis::meta::v1::{Condition, Time}, }, }; -use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta}; -use rand::Rng; -use rand::distr::Uniform; +use kube::{CustomResource, Resource, ResourceExt}; use schemars::JsonSchema; use semver::Version; use serde::{Deserialize, Serialize}; use uuid::Uuid; +use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id}; use mz_server_core::listeners::AuthenticatorKind; -use crate::crd::generated::cert_manager::certificates::{ - CertificateIssuerRef, CertificateSecretTemplate, -}; - pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str = "materialize.cloud/last-known-active-generation"; pub mod v1alpha1 { - use super::*; - // This is intentionally a subset of the fields of a Certificate. - // We do not want customers to configure options that may conflict with - // things we override or expand in our code. - #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)] - #[serde(rename_all = "camelCase")] - pub struct MaterializeCertSpec { - /// Additional DNS names the certificate will be valid for. - pub dns_names: Option>, - /// Duration the certificate will be requested for. - /// Value must be in units accepted by Go - /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration). - pub duration: Option, - /// Duration before expiration the certificate will be renewed. - /// Value must be in units accepted by Go - /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration). - pub renew_before: Option, - /// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate. - pub issuer_ref: Option, - /// Additional annotations and labels to include in the Certificate object. - pub secret_template: Option, - } #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)] pub enum MaterializeRolloutStrategy { /// Create a new generation of pods, leaving the old generation around until the @@ -368,23 +341,6 @@ pub mod v1alpha1 { }) } - pub fn default_labels(&self) -> BTreeMap { - BTreeMap::from_iter([ - ( - "materialize.cloud/organization-name".to_owned(), - self.name_unchecked(), - ), - ( - "materialize.cloud/organization-namespace".to_owned(), - self.namespace(), - ), - ( - "materialize.cloud/mz-resource-id".to_owned(), - self.resource_id().to_owned(), - ), - ]) - } - pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String { format!( "{}-{}-{}-0", @@ -544,29 +500,11 @@ pub mod v1alpha1 { } } - pub fn managed_resource_meta(&self, name: String) -> ObjectMeta { - ObjectMeta { - namespace: Some(self.namespace()), - name: Some(name), - labels: Some(self.default_labels()), - owner_references: Some(vec![owner_reference(self)]), - ..Default::default() - } - } - pub fn status(&self) -> MaterializeStatus { self.status.clone().unwrap_or_else(|| { let mut status = MaterializeStatus::default(); - // DNS-1035 names are supposed to be case insensitive, - // so we define our own character set, rather than use the - // built-in Alphanumeric distribution from rand, which - // includes both upper and lowercase letters. - const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789"; - status.resource_id = rand::rng() - .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range")) - .take(10) - .map(|i| char::from(CHARSET[i])) - .collect(); + + status.resource_id = new_resource_id(); // If we're creating the initial status on an un-soft-deleted // Environment we need to ensure that the last active generation @@ -626,6 +564,25 @@ pub mod v1alpha1 { a != b } } + + impl ManagedResource for Materialize { + fn default_labels(&self) -> BTreeMap { + BTreeMap::from_iter([ + ( + "materialize.cloud/organization-name".to_owned(), + self.name_unchecked(), + ), + ( + "materialize.cloud/organization-namespace".to_owned(), + self.namespace(), + ), + ( + "materialize.cloud/mz-resource-id".to_owned(), + self.resource_id().to_owned(), + ), + ]) + } + } } fn parse_image_ref(image_ref: &str) -> Option { @@ -642,17 +599,6 @@ fn parse_image_ref(image_ref: &str) -> Option { }) } -fn owner_reference>(t: &T) -> OwnerReference { - OwnerReference { - api_version: T::api_version(&()).to_string(), - kind: T::kind(&()).to_string(), - name: t.name_unchecked(), - uid: t.uid().unwrap(), - block_owner_deletion: Some(true), - ..Default::default() - } -} - #[cfg(test)] mod tests { use kube::core::ObjectMeta; diff --git a/src/orchestratord/Cargo.toml b/src/orchestratord/Cargo.toml index 0416d353eaca0..94407d4f47cd6 100644 --- a/src/orchestratord/Cargo.toml +++ b/src/orchestratord/Cargo.toml @@ -17,7 +17,7 @@ chrono = { version = "0.4.39", default-features = false } clap = { version = "4.5.23", features = ["derive"] } futures = "0.3.31" http = "1.2.0" -k8s-controller = "0.6.1" +k8s-controller = "0.8.0" k8s-openapi = { version = "0.26.0", features = ["v1_31"] } kube = { version = "2.0.1", default-features = false, features = ["client", "runtime", "ws"] } maplit = "1.0.2" diff --git a/src/orchestratord/src/bin/orchestratord.rs b/src/orchestratord/src/bin/orchestratord.rs index 76d8e47e756d3..f24984acde40a 100644 --- a/src/orchestratord/src/bin/orchestratord.rs +++ b/src/orchestratord/src/bin/orchestratord.rs @@ -13,20 +13,26 @@ use std::{ sync::{Arc, LazyLock}, }; -use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition; +use http::HeaderValue; +use k8s_openapi::{ + api::core::v1::{Affinity, ResourceRequirements, Toleration}, + apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition, +}; use kube::runtime::watcher; +use mz_cloud_provider::CloudProvider; use tracing::info; use mz_build_info::{BuildInfo, build_info}; -use mz_orchestrator_kubernetes::util::create_client; +use mz_orchestrator_kubernetes::{KubernetesImagePullPolicy, util::create_client}; use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs}; use mz_orchestratord::{ controller, k8s::register_crds, metrics::{self, Metrics}, + tls::DefaultCertificateSpecs, }; use mz_ore::{ - cli::{self, CliConfig}, + cli::{self, CliConfig, KeyValueArg}, error::ErrorExt, metrics::MetricsRegistry, }; @@ -45,8 +51,159 @@ pub struct Args { #[clap(long, default_value = "[::]:3100")] metrics_listen_address: SocketAddr, - #[clap(flatten)] - materialize_controller_args: controller::materialize::MaterializeControllerArgs, + #[clap(long)] + cloud_provider: CloudProvider, + #[clap(long)] + region: String, + #[clap(long)] + create_balancers: bool, + #[clap(long)] + create_console: bool, + #[clap(long)] + helm_chart_version: Option, + #[clap(long, default_value = "kubernetes")] + secrets_controller: String, + #[clap(long)] + collect_pod_metrics: bool, + #[clap(long)] + enable_prometheus_scrape_annotations: bool, + #[clap(long)] + disable_authentication: bool, + + #[clap(long)] + segment_api_key: Option, + #[clap(long)] + segment_client_side: bool, + + #[clap(long)] + console_image_tag_default: String, + #[clap(long)] + console_image_tag_map: Vec>, + + #[clap(long)] + aws_account_id: Option, + #[clap(long)] + environmentd_iam_role_arn: Option, + #[clap(long)] + environmentd_connection_role_arn: Option, + #[clap(long)] + aws_secrets_controller_tags: Vec, + #[clap(long)] + environmentd_availability_zones: Option>, + + #[clap(long)] + ephemeral_volume_class: Option, + #[clap(long)] + scheduler_name: Option, + #[clap(long)] + enable_security_context: bool, + #[clap(long)] + enable_internal_statement_logging: bool, + #[clap(long, default_value = "false")] + disable_statement_logging: bool, + + #[clap(long)] + orchestratord_pod_selector_labels: Vec>, + #[clap(long)] + environmentd_node_selector: Vec>, + #[clap(long, value_parser = parse_affinity)] + environmentd_affinity: Option, + #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)] + environmentd_tolerations: Option>, + #[clap(long, value_parser = parse_resources)] + environmentd_default_resources: Option, + #[clap(long)] + clusterd_node_selector: Vec>, + #[clap(long, value_parser = parse_affinity)] + clusterd_affinity: Option, + #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)] + clusterd_tolerations: Option>, + #[clap(long)] + balancerd_node_selector: Vec>, + #[clap(long, value_parser = parse_affinity)] + balancerd_affinity: Option, + #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)] + balancerd_tolerations: Option>, + #[clap(long, value_parser = parse_resources)] + balancerd_default_resources: Option, + #[clap(long)] + console_node_selector: Vec>, + #[clap(long, value_parser = parse_affinity)] + console_affinity: Option, + #[clap(long = "console-toleration", value_parser = parse_tolerations)] + console_tolerations: Option>, + #[clap(long, value_parser = parse_resources)] + console_default_resources: Option, + #[clap(long, default_value = "always", value_enum)] + image_pull_policy: KubernetesImagePullPolicy, + #[clap(long)] + network_policies_internal_enabled: bool, + #[clap(long)] + network_policies_ingress_enabled: bool, + #[clap(long)] + network_policies_ingress_cidrs: Vec, + #[clap(long)] + network_policies_egress_enabled: bool, + #[clap(long)] + network_policies_egress_cidrs: Vec, + + #[clap(long)] + environmentd_cluster_replica_sizes: Option, + #[clap(long)] + bootstrap_default_cluster_replica_size: Option, + #[clap(long)] + bootstrap_builtin_system_cluster_replica_size: Option, + #[clap(long)] + bootstrap_builtin_probe_cluster_replica_size: Option, + #[clap(long)] + bootstrap_builtin_support_cluster_replica_size: Option, + #[clap(long)] + bootstrap_builtin_catalog_server_cluster_replica_size: Option, + #[clap(long)] + bootstrap_builtin_analytics_cluster_replica_size: Option, + #[clap(long)] + bootstrap_builtin_system_cluster_replication_factor: Option, + #[clap(long)] + bootstrap_builtin_probe_cluster_replication_factor: Option, + #[clap(long)] + bootstrap_builtin_support_cluster_replication_factor: Option, + #[clap(long)] + bootstrap_builtin_analytics_cluster_replication_factor: Option, + + #[clap( + long, + default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"], + )] + environmentd_allowed_origins: Vec, + #[clap(long, default_value = "https://console.materialize.com")] + internal_console_proxy_url: String, + + #[clap(long, default_value = "6875")] + environmentd_sql_port: u16, + #[clap(long, default_value = "6876")] + environmentd_http_port: u16, + #[clap(long, default_value = "6877")] + environmentd_internal_sql_port: u16, + #[clap(long, default_value = "6878")] + environmentd_internal_http_port: u16, + #[clap(long, default_value = "6879")] + environmentd_internal_persist_pubsub_port: u16, + + #[clap(long, default_value = "6875")] + balancerd_sql_port: u16, + #[clap(long, default_value = "6876")] + balancerd_http_port: u16, + #[clap(long, default_value = "8080")] + balancerd_internal_http_port: u16, + + #[clap(long, default_value = "8080")] + console_http_port: u16, + + #[clap(long, default_value = "{}")] + default_certificate_specs: DefaultCertificateSpecs, + + #[clap(long, hide = true)] + disable_license_key_checks: bool, #[clap(flatten)] tracing: TracingCliArgs, @@ -55,6 +212,18 @@ pub struct Args { additional_crd_columns: Option>, } +fn parse_affinity(s: &str) -> anyhow::Result { + Ok(serde_json::from_str(s)?) +} + +fn parse_tolerations(s: &str) -> anyhow::Result { + Ok(serde_json::from_str(s)?) +} + +fn parse_resources(s: &str) -> anyhow::Result { + Ok(serde_json::from_str(s)?) +} + fn parse_crd_columns(val: &str) -> Result, serde_json::Error> { serde_json::from_str(val) } @@ -130,10 +299,90 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { || "materialize controller", k8s_controller::Controller::namespaced_all( client.clone(), - mz_orchestratord::controller::materialize::Context::new( - args.materialize_controller_args, - args.tracing, - namespace, + controller::materialize::Context::new( + controller::materialize::Config { + cloud_provider: args.cloud_provider, + region: args.region, + create_balancers: args.create_balancers, + create_console: args.create_console, + helm_chart_version: args.helm_chart_version, + secrets_controller: args.secrets_controller, + collect_pod_metrics: args.collect_pod_metrics, + enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations, + segment_api_key: args.segment_api_key, + segment_client_side: args.segment_client_side, + console_image_tag_default: args.console_image_tag_default, + console_image_tag_map: args.console_image_tag_map, + aws_account_id: args.aws_account_id, + environmentd_iam_role_arn: args.environmentd_iam_role_arn, + environmentd_connection_role_arn: args.environmentd_connection_role_arn, + aws_secrets_controller_tags: args.aws_secrets_controller_tags, + environmentd_availability_zones: args.environmentd_availability_zones, + ephemeral_volume_class: args.ephemeral_volume_class, + scheduler_name: args.scheduler_name, + enable_security_context: args.enable_security_context, + enable_internal_statement_logging: args.enable_internal_statement_logging, + disable_statement_logging: args.disable_statement_logging, + orchestratord_pod_selector_labels: args.orchestratord_pod_selector_labels, + environmentd_node_selector: args.environmentd_node_selector, + environmentd_affinity: args.environmentd_affinity, + environmentd_tolerations: args.environmentd_tolerations, + environmentd_default_resources: args.environmentd_default_resources, + clusterd_node_selector: args.clusterd_node_selector, + clusterd_affinity: args.clusterd_affinity, + clusterd_tolerations: args.clusterd_tolerations, + balancerd_node_selector: args.balancerd_node_selector, + balancerd_affinity: args.balancerd_affinity, + balancerd_tolerations: args.balancerd_tolerations, + balancerd_default_resources: args.balancerd_default_resources, + console_node_selector: args.console_node_selector, + console_affinity: args.console_affinity, + console_tolerations: args.console_tolerations, + console_default_resources: args.console_default_resources, + image_pull_policy: args.image_pull_policy, + network_policies_internal_enabled: args.network_policies_internal_enabled, + network_policies_ingress_enabled: args.network_policies_ingress_enabled, + network_policies_ingress_cidrs: args.network_policies_ingress_cidrs, + network_policies_egress_enabled: args.network_policies_egress_enabled, + network_policies_egress_cidrs: args.network_policies_egress_cidrs, + environmentd_cluster_replica_sizes: args.environmentd_cluster_replica_sizes, + bootstrap_default_cluster_replica_size: args + .bootstrap_default_cluster_replica_size, + bootstrap_builtin_system_cluster_replica_size: args + .bootstrap_builtin_system_cluster_replica_size, + bootstrap_builtin_probe_cluster_replica_size: args + .bootstrap_builtin_probe_cluster_replica_size, + bootstrap_builtin_support_cluster_replica_size: args + .bootstrap_builtin_support_cluster_replica_size, + bootstrap_builtin_catalog_server_cluster_replica_size: args + .bootstrap_builtin_catalog_server_cluster_replica_size, + bootstrap_builtin_analytics_cluster_replica_size: args + .bootstrap_builtin_analytics_cluster_replica_size, + bootstrap_builtin_system_cluster_replication_factor: args + .bootstrap_builtin_system_cluster_replication_factor, + bootstrap_builtin_probe_cluster_replication_factor: args + .bootstrap_builtin_probe_cluster_replication_factor, + bootstrap_builtin_support_cluster_replication_factor: args + .bootstrap_builtin_support_cluster_replication_factor, + bootstrap_builtin_analytics_cluster_replication_factor: args + .bootstrap_builtin_analytics_cluster_replication_factor, + environmentd_allowed_origins: args.environmentd_allowed_origins, + internal_console_proxy_url: args.internal_console_proxy_url, + environmentd_sql_port: args.environmentd_sql_port, + environmentd_http_port: args.environmentd_http_port, + environmentd_internal_sql_port: args.environmentd_internal_sql_port, + environmentd_internal_http_port: args.environmentd_internal_http_port, + environmentd_internal_persist_pubsub_port: args + .environmentd_internal_persist_pubsub_port, + balancerd_sql_port: args.balancerd_sql_port, + balancerd_http_port: args.balancerd_http_port, + balancerd_internal_http_port: args.balancerd_internal_http_port, + console_http_port: args.console_http_port, + default_certificate_specs: args.default_certificate_specs, + disable_license_key_checks: args.disable_license_key_checks, + tracing: args.tracing, + orchestratord_namespace: namespace, + }, Arc::clone(&metrics), client.clone(), ) diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 2163e3b159536..3bfb576be9684 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -9,14 +9,10 @@ use std::{ collections::BTreeSet, - fmt::Display, - future::ready, - str::FromStr, sync::{Arc, Mutex}, }; use anyhow::Context as _; -use futures::StreamExt; use http::HeaderValue; use k8s_openapi::{ api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration}, @@ -25,16 +21,18 @@ use k8s_openapi::{ use kube::{ Api, Client, Resource, ResourceExt, api::PostParams, - runtime::{controller::Action, reflector, watcher}, + runtime::{controller::Action, reflector}, }; -use serde::{Deserialize, Serialize, de::DeserializeOwned}; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace}; use uuid::Uuid; -use crate::{controller::materialize::environmentd::V161, metrics::Metrics}; +use crate::{ + Error, controller::materialize::environmentd::V161, k8s::make_reflector, + matching_image_from_environmentd_image_ref, metrics::Metrics, tls::DefaultCertificateSpecs, +}; use mz_cloud_provider::CloudProvider; use mz_cloud_resources::crd::materialize::v1alpha1::{ - Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus, + Materialize, MaterializeRolloutStrategy, MaterializeStatus, }; use mz_license_keys::validate; use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; @@ -44,256 +42,113 @@ use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument}; pub mod balancer; pub mod console; pub mod environmentd; -pub mod tls; - -#[derive(clap::Parser)] -pub struct MaterializeControllerArgs { - #[clap(long)] - cloud_provider: CloudProvider, - #[clap(long)] - region: String, - #[clap(long)] - create_balancers: bool, - #[clap(long)] - create_console: bool, - #[clap(long)] - helm_chart_version: Option, - #[clap(long, default_value = "kubernetes")] - secrets_controller: String, - #[clap(long)] - collect_pod_metrics: bool, - #[clap(long)] - enable_prometheus_scrape_annotations: bool, - #[clap(long)] - disable_authentication: bool, - - #[clap(long)] - segment_api_key: Option, - #[clap(long)] - segment_client_side: bool, - - #[clap(long)] - console_image_tag_default: String, - #[clap(long)] - console_image_tag_map: Vec>, - - #[clap(flatten)] - aws_info: AwsInfo, - - #[clap(long)] - ephemeral_volume_class: Option, - #[clap(long)] - scheduler_name: Option, - #[clap(long)] - enable_security_context: bool, - #[clap(long)] - enable_internal_statement_logging: bool, - #[clap(long, default_value = "false")] - disable_statement_logging: bool, - - #[clap(long)] - orchestratord_pod_selector_labels: Vec>, - #[clap(long)] - environmentd_node_selector: Vec>, - #[clap(long, value_parser = parse_affinity)] - environmentd_affinity: Option, - #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)] - environmentd_tolerations: Option>, - #[clap(long, value_parser = parse_resources)] - environmentd_default_resources: Option, - #[clap(long)] - clusterd_node_selector: Vec>, - #[clap(long, value_parser = parse_affinity)] - clusterd_affinity: Option, - #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)] - clusterd_tolerations: Option>, - #[clap(long)] - balancerd_node_selector: Vec>, - #[clap(long, value_parser = parse_affinity)] - balancerd_affinity: Option, - #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)] - balancerd_tolerations: Option>, - #[clap(long, value_parser = parse_resources)] - balancerd_default_resources: Option, - #[clap(long)] - console_node_selector: Vec>, - #[clap(long, value_parser = parse_affinity)] - console_affinity: Option, - #[clap(long = "console-toleration", value_parser = parse_tolerations)] - console_tolerations: Option>, - #[clap(long, value_parser = parse_resources)] - console_default_resources: Option, - #[clap(long, default_value = "always", value_enum)] - image_pull_policy: KubernetesImagePullPolicy, - #[clap(flatten)] - network_policies: NetworkPolicyConfig, - - #[clap(long)] - environmentd_cluster_replica_sizes: Option, - #[clap(long)] - bootstrap_default_cluster_replica_size: Option, - #[clap(long)] - bootstrap_builtin_system_cluster_replica_size: Option, - #[clap(long)] - bootstrap_builtin_probe_cluster_replica_size: Option, - #[clap(long)] - bootstrap_builtin_support_cluster_replica_size: Option, - #[clap(long)] - bootstrap_builtin_catalog_server_cluster_replica_size: Option, - #[clap(long)] - bootstrap_builtin_analytics_cluster_replica_size: Option, - #[clap(long)] - bootstrap_builtin_system_cluster_replication_factor: Option, - #[clap(long)] - bootstrap_builtin_probe_cluster_replication_factor: Option, - #[clap(long)] - bootstrap_builtin_support_cluster_replication_factor: Option, - #[clap(long)] - bootstrap_builtin_analytics_cluster_replication_factor: Option, - - #[clap( - long, - default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"], - )] - environmentd_allowed_origins: Vec, - #[clap(long, default_value = "https://console.materialize.com")] - internal_console_proxy_url: String, - - #[clap(long, default_value = "6875")] - environmentd_sql_port: u16, - #[clap(long, default_value = "6876")] - environmentd_http_port: u16, - #[clap(long, default_value = "6877")] - environmentd_internal_sql_port: u16, - #[clap(long, default_value = "6878")] - environmentd_internal_http_port: u16, - #[clap(long, default_value = "6879")] - environmentd_internal_persist_pubsub_port: u16, - - #[clap(long, default_value = "6875")] - balancerd_sql_port: u16, - #[clap(long, default_value = "6876")] - balancerd_http_port: u16, - #[clap(long, default_value = "8080")] - balancerd_internal_http_port: u16, - - #[clap(long, default_value = "8080")] - console_http_port: u16, - - #[clap(long, default_value = "{}")] - default_certificate_specs: DefaultCertificateSpecs, - - #[clap(long, hide = true)] - disable_license_key_checks: bool, -} - -fn parse_affinity(s: &str) -> anyhow::Result { - Ok(serde_json::from_str(s)?) -} - -fn parse_tolerations(s: &str) -> anyhow::Result { - Ok(serde_json::from_str(s)?) -} - -fn parse_resources(s: &str) -> anyhow::Result { - Ok(serde_json::from_str(s)?) -} - -#[derive(Clone, Deserialize, Default)] -#[serde(rename_all = "camelCase")] -pub struct DefaultCertificateSpecs { - balancerd_external: Option, - console_external: Option, - internal: Option, -} - -impl FromStr for DefaultCertificateSpecs { - type Err = serde_json::Error; - - fn from_str(s: &str) -> Result { - serde_json::from_str(s) - } -} - -#[derive(clap::Parser)] -pub struct AwsInfo { - #[clap(long)] - aws_account_id: Option, - #[clap(long)] - environmentd_iam_role_arn: Option, - #[clap(long)] - environmentd_connection_role_arn: Option, - #[clap(long)] - aws_secrets_controller_tags: Vec, - #[clap(long)] - environmentd_availability_zones: Option>, -} - -#[derive(clap::Parser)] -pub struct NetworkPolicyConfig { - #[clap(long = "network-policies-internal-enabled")] - internal_enabled: bool, - - #[clap(long = "network-policies-ingress-enabled")] - ingress_enabled: bool, - - #[clap(long = "network-policies-ingress-cidrs")] - ingress_cidrs: Vec, - - #[clap(long = "network-policies-egress-enabled")] - egress_enabled: bool, - #[clap(long = "network-policies-egress-cidrs")] - egress_cidrs: Vec, -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - Anyhow(#[from] anyhow::Error), - Kube(#[from] kube::Error), - Reqwest(#[from] reqwest::Error), -} - -impl Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Anyhow(e) => write!(f, "{e}"), - Self::Kube(e) => write!(f, "{e}"), - Self::Reqwest(e) => write!(f, "{e}"), - } - } +pub struct Config { + pub cloud_provider: CloudProvider, + pub region: String, + pub create_balancers: bool, + pub create_console: bool, + pub helm_chart_version: Option, + pub secrets_controller: String, + pub collect_pod_metrics: bool, + pub enable_prometheus_scrape_annotations: bool, + + pub segment_api_key: Option, + pub segment_client_side: bool, + + pub console_image_tag_default: String, + pub console_image_tag_map: Vec>, + + pub aws_account_id: Option, + pub environmentd_iam_role_arn: Option, + pub environmentd_connection_role_arn: Option, + pub aws_secrets_controller_tags: Vec, + pub environmentd_availability_zones: Option>, + + pub ephemeral_volume_class: Option, + pub scheduler_name: Option, + pub enable_security_context: bool, + pub enable_internal_statement_logging: bool, + pub disable_statement_logging: bool, + + pub orchestratord_pod_selector_labels: Vec>, + pub environmentd_node_selector: Vec>, + pub environmentd_affinity: Option, + pub environmentd_tolerations: Option>, + pub environmentd_default_resources: Option, + pub clusterd_node_selector: Vec>, + pub clusterd_affinity: Option, + pub clusterd_tolerations: Option>, + pub balancerd_node_selector: Vec>, + pub balancerd_affinity: Option, + pub balancerd_tolerations: Option>, + pub balancerd_default_resources: Option, + pub console_node_selector: Vec>, + pub console_affinity: Option, + pub console_tolerations: Option>, + pub console_default_resources: Option, + pub image_pull_policy: KubernetesImagePullPolicy, + pub network_policies_internal_enabled: bool, + pub network_policies_ingress_enabled: bool, + pub network_policies_ingress_cidrs: Vec, + pub network_policies_egress_enabled: bool, + pub network_policies_egress_cidrs: Vec, + + pub environmentd_cluster_replica_sizes: Option, + pub bootstrap_default_cluster_replica_size: Option, + pub bootstrap_builtin_system_cluster_replica_size: Option, + pub bootstrap_builtin_probe_cluster_replica_size: Option, + pub bootstrap_builtin_support_cluster_replica_size: Option, + pub bootstrap_builtin_catalog_server_cluster_replica_size: Option, + pub bootstrap_builtin_analytics_cluster_replica_size: Option, + pub bootstrap_builtin_system_cluster_replication_factor: Option, + pub bootstrap_builtin_probe_cluster_replication_factor: Option, + pub bootstrap_builtin_support_cluster_replication_factor: Option, + pub bootstrap_builtin_analytics_cluster_replication_factor: Option, + + pub environmentd_allowed_origins: Vec, + pub internal_console_proxy_url: String, + + pub environmentd_sql_port: u16, + pub environmentd_http_port: u16, + pub environmentd_internal_sql_port: u16, + pub environmentd_internal_http_port: u16, + pub environmentd_internal_persist_pubsub_port: u16, + + pub balancerd_sql_port: u16, + pub balancerd_http_port: u16, + pub balancerd_internal_http_port: u16, + + pub console_http_port: u16, + + pub default_certificate_specs: DefaultCertificateSpecs, + + pub disable_license_key_checks: bool, + + pub tracing: TracingCliArgs, + pub orchestratord_namespace: String, } pub struct Context { - config: MaterializeControllerArgs, - tracing: TracingCliArgs, - orchestratord_namespace: String, + config: Config, metrics: Arc, materializes: reflector::Store, needs_update: Arc>>, } impl Context { - pub async fn new( - config: MaterializeControllerArgs, - tracing: TracingCliArgs, - orchestratord_namespace: String, - metrics: Arc, - client: kube::Client, - ) -> Self { + pub async fn new(config: Config, metrics: Arc, client: kube::Client) -> Self { if config.cloud_provider == CloudProvider::Aws { assert!( - config.aws_info.aws_account_id.is_some(), + config.aws_account_id.is_some(), "--aws-account-id is required when using --cloud-provider=aws" ); } Self { config, - tracing, - orchestratord_namespace, metrics, - materializes: Self::make_reflector(client.clone()).await, + materializes: make_reflector(client.clone()).await, needs_update: Default::default(), } } @@ -408,40 +263,6 @@ impl Context { Ok(()) } - - async fn make_reflector(client: Client) -> reflector::Store - where - K: kube::Resource - + Clone - + Send - + Sync - + DeserializeOwned - + Serialize - + std::fmt::Debug - + 'static, - { - let api = kube::Api::all(client); - let (store, writer) = reflector::store(); - let reflector = - reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29))); - mz_ore::task::spawn( - || format!("{} reflector", K::kind(&Default::default())), - async { - reflector - .for_each(|res| { - if let Err(e) = res { - warn!("error in {} reflector: {}", K::kind(&Default::default()), e); - } - ready(()) - }) - .await - }, - ); - // the only way this can return an error is if we drop the writer, - // which we do not ever do, so unwrap is fine - store.wait_until_ready().await.unwrap(); - store - } } #[async_trait::async_trait] @@ -449,7 +270,8 @@ impl k8s_controller::Context for Context { type Resource = Materialize; type Error = Error; - const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize"; + const FINALIZER_NAME: Option<&'static str> = + Some("orchestratord.materialize.cloud/materialize"); #[instrument(fields(organization_name=mz.name_unchecked()))] async fn apply( @@ -549,13 +371,8 @@ impl k8s_controller::Context for Context { // have been applied earlier, but we don't want to use these // environment resources because when we apply them, we want to apply // them with data that uses the new generation - let active_resources = environmentd::Resources::new( - &self.config, - &self.tracing, - &self.orchestratord_namespace, - mz, - status.active_generation, - ); + let active_resources = + environmentd::Resources::new(&self.config, mz, status.active_generation); let has_current_changes = status.resources_hash != active_resources.generate_hash(); let active_generation = status.active_generation; let next_generation = active_generation + 1; @@ -567,13 +384,7 @@ impl k8s_controller::Context for Context { // here we regenerate the environment resources using the // same inputs except with an updated generation - let resources = environmentd::Resources::new( - &self.config, - &self.tracing, - &self.orchestratord_namespace, - mz, - desired_generation, - ); + let resources = environmentd::Resources::new(&self.config, mz, desired_generation); let resources_hash = resources.generate_hash(); let mut result = match ( @@ -919,21 +730,3 @@ impl k8s_controller::Context for Context { Ok(None) } } - -fn matching_image_from_environmentd_image_ref( - environmentd_image_ref: &str, - image_name: &str, - image_tag: Option<&str>, -) -> String { - let namespace = environmentd_image_ref - .rsplit_once('/') - .unwrap_or(("materialize", "")) - .0; - let tag = image_tag.unwrap_or_else(|| { - environmentd_image_ref - .rsplit_once(':') - .unwrap_or(("", "unstable")) - .1 - }); - format!("{namespace}/{image_name}:{tag}") -} diff --git a/src/orchestratord/src/controller/materialize/balancer.rs b/src/orchestratord/src/controller/materialize/balancer.rs index 70b2b8f9497f2..247221351ad5c 100644 --- a/src/orchestratord/src/controller/materialize/balancer.rs +++ b/src/orchestratord/src/controller/materialize/balancer.rs @@ -25,13 +25,12 @@ use maplit::btreemap; use tracing::trace; use crate::{ - controller::materialize::{ - Error, matching_image_from_environmentd_image_ref, - tls::{create_certificate, issuer_ref_defined}, - }, + controller::materialize::{Error, matching_image_from_environmentd_image_ref}, k8s::{apply_resource, delete_resource, get_resource}, + tls::{create_certificate, issuer_ref_defined}, }; use mz_cloud_resources::crd::{ + ManagedResource, generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, materialize::v1alpha1::Materialize, }; @@ -44,7 +43,7 @@ pub struct Resources { } impl Resources { - pub fn new(config: &super::MaterializeControllerArgs, mz: &Materialize) -> Self { + pub fn new(config: &super::Config, mz: &Materialize) -> Self { let balancerd_external_certificate = Box::new(create_balancerd_external_certificate(config, mz)); let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz)); @@ -118,7 +117,7 @@ impl Resources { } fn create_balancerd_external_certificate( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, ) -> Option { create_certificate( @@ -133,10 +132,7 @@ fn create_balancerd_external_certificate( ) } -fn create_balancerd_deployment_object( - config: &super::MaterializeControllerArgs, - mz: &Materialize, -) -> Deployment { +fn create_balancerd_deployment_object(config: &super::Config, mz: &Materialize) -> Deployment { let security_context = if config.enable_security_context { // Since we want to adhere to the most restrictive security context, all // of these fields have to be set how they are. @@ -378,10 +374,7 @@ fn create_balancerd_deployment_object( } } -fn create_balancerd_service_object( - config: &super::MaterializeControllerArgs, - mz: &Materialize, -) -> Service { +fn create_balancerd_service_object(config: &super::Config, mz: &Materialize) -> Service { let selector = btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()}; diff --git a/src/orchestratord/src/controller/materialize/console.rs b/src/orchestratord/src/controller/materialize/console.rs index b3c42f2a8e6ec..18c1a2631c42f 100644 --- a/src/orchestratord/src/controller/materialize/console.rs +++ b/src/orchestratord/src/controller/materialize/console.rs @@ -30,10 +30,11 @@ use serde::Serialize; use tracing::trace; use crate::{ - controller::materialize::tls::{create_certificate, issuer_ref_defined}, k8s::{apply_resource, delete_resource}, + tls::{create_certificate, issuer_ref_defined}, }; use mz_cloud_resources::crd::{ + ManagedResource, generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, materialize::v1alpha1::Materialize, }; @@ -47,11 +48,7 @@ pub struct Resources { } impl Resources { - pub fn new( - config: &super::MaterializeControllerArgs, - mz: &Materialize, - console_image_ref: &str, - ) -> Self { + pub fn new(config: &super::Config, mz: &Materialize, console_image_ref: &str) -> Self { let network_policies = create_network_policies(config, mz); let console_configmap = Box::new(create_console_app_configmap_object(mz, console_image_ref)); @@ -139,12 +136,9 @@ impl Resources { } } -fn create_network_policies( - config: &super::MaterializeControllerArgs, - mz: &Materialize, -) -> Vec { +fn create_network_policies(config: &super::Config, mz: &Materialize) -> Vec { let mut network_policies = Vec::new(); - if config.network_policies.ingress_enabled { + if config.network_policies_ingress_enabled { let console_label_selector = LabelSelector { match_labels: Some( mz.default_labels() @@ -160,8 +154,7 @@ fn create_network_policies( ingress: Some(vec![NetworkPolicyIngressRule { from: Some( config - .network_policies - .ingress_cidrs + .network_policies_ingress_cidrs .iter() .map(|cidr| NetworkPolicyPeer { ip_block: Some(IPBlock { @@ -189,7 +182,7 @@ fn create_network_policies( } fn create_console_external_certificate( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, ) -> Option { create_certificate( @@ -239,7 +232,7 @@ fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str } fn create_console_deployment_object( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, console_image_ref: &str, ) -> Deployment { @@ -444,10 +437,7 @@ ssl_certificate_key /nginx/tls/tls.key;", } } -fn create_console_service_object( - config: &super::MaterializeControllerArgs, - mz: &Materialize, -) -> Service { +fn create_console_service_object(config: &super::Config, mz: &Materialize) -> Service { let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()}; let ports = vec![ServicePort { diff --git a/src/orchestratord/src/controller/materialize/environmentd.rs b/src/orchestratord/src/controller/materialize/environmentd.rs index df72038b89879..59f52e4412c1a 100644 --- a/src/orchestratord/src/controller/materialize/environmentd.rs +++ b/src/orchestratord/src/controller/materialize/environmentd.rs @@ -46,14 +46,14 @@ use tracing::{error, trace, warn}; use super::Error; use super::matching_image_from_environmentd_image_ref; -use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined}; use crate::k8s::{apply_resource, delete_resource, get_resource}; +use crate::tls::{create_certificate, issuer_ref_defined}; use mz_cloud_provider::CloudProvider; -use mz_cloud_resources::crd::generated::cert_manager::certificates::{ - Certificate, CertificatePrivateKeyAlgorithm, -}; use mz_cloud_resources::crd::materialize::v1alpha1::Materialize; -use mz_orchestrator_tracing::TracingCliArgs; +use mz_cloud_resources::crd::{ + ManagedResource, + generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, +}; use mz_ore::instrument; static V140_DEV0: LazyLock = LazyLock::new(|| Version { @@ -134,15 +134,8 @@ pub struct Resources { } impl Resources { - pub fn new( - config: &super::MaterializeControllerArgs, - tracing: &TracingCliArgs, - orchestratord_namespace: &str, - mz: &Materialize, - generation: u64, - ) -> Self { - let environmentd_network_policies = - create_environmentd_network_policies(config, mz, orchestratord_namespace); + pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self { + let environmentd_network_policies = create_environmentd_network_policies(config, mz); let service_account = Box::new(create_service_account_object(config, mz)); let role = Box::new(create_role_object(mz)); @@ -153,7 +146,7 @@ impl Resources { Box::new(create_persist_pubsub_service(config, mz, generation)); let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz)); let environmentd_statefulset = Box::new(create_environmentd_statefulset_object( - config, tracing, mz, generation, + config, mz, generation, )); let connection_info = Box::new(create_connection_info(config, mz, generation)); @@ -471,12 +464,11 @@ impl Resources { } fn create_environmentd_network_policies( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, - orchestratord_namespace: &str, ) -> Vec { let mut network_policies = Vec::new(); - if config.network_policies.internal_enabled { + if config.network_policies_internal_enabled { let environmentd_label_selector = LabelSelector { match_labels: Some( mz.default_labels() @@ -542,7 +534,7 @@ fn create_environmentd_network_policies( namespace_selector: Some(LabelSelector { match_labels: Some(btreemap! { "kubernetes.io/metadata.name".into() - => orchestratord_namespace.into(), + => config.orchestratord_namespace.clone(), }), ..Default::default() }), @@ -572,7 +564,7 @@ fn create_environmentd_network_policies( }, ]); } - if config.network_policies.ingress_enabled { + if config.network_policies_ingress_enabled { let mut ingress_label_selector = mz.default_labels(); ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name()); network_policies.extend([NetworkPolicy { @@ -581,8 +573,7 @@ fn create_environmentd_network_policies( ingress: Some(vec![NetworkPolicyIngressRule { from: Some( config - .network_policies - .ingress_cidrs + .network_policies_ingress_cidrs .iter() .map(|cidr| NetworkPolicyPeer { ip_block: Some(IPBlock { @@ -616,15 +607,14 @@ fn create_environmentd_network_policies( }), }]); } - if config.network_policies.egress_enabled { + if config.network_policies_egress_enabled { network_policies.extend([NetworkPolicy { metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")), spec: Some(NetworkPolicySpec { egress: Some(vec![NetworkPolicyEgressRule { to: Some( config - .network_policies - .egress_cidrs + .network_policies_egress_cidrs .iter() .map(|cidr| NetworkPolicyPeer { ip_block: Some(IPBlock { @@ -650,7 +640,7 @@ fn create_environmentd_network_policies( } fn create_service_account_object( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, ) -> Option { if mz.create_service_account() { @@ -664,7 +654,7 @@ fn create_service_account_object( mz.spec .environmentd_iam_role_arn .as_deref() - .or(config.aws_info.environmentd_iam_role_arn.as_deref()), + .or(config.environmentd_iam_role_arn.as_deref()), ) { warn!( "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead." @@ -785,7 +775,7 @@ fn create_role_binding_object(mz: &Materialize) -> RoleBinding { } fn create_public_service_object( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, generation: u64, ) -> Service { @@ -793,7 +783,7 @@ fn create_public_service_object( } fn create_generation_service_object( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, generation: u64, ) -> Service { @@ -806,7 +796,7 @@ fn create_generation_service_object( } fn create_base_service_object( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, generation: u64, service_name: &str, @@ -856,7 +846,7 @@ fn create_base_service_object( } fn create_persist_pubsub_service( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, generation: u64, ) -> Service { @@ -881,7 +871,7 @@ fn create_persist_pubsub_service( } fn create_environmentd_certificate( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, ) -> Option { create_certificate( @@ -900,8 +890,7 @@ fn create_environmentd_certificate( } fn create_environmentd_statefulset_object( - config: &super::MaterializeControllerArgs, - tracing: &TracingCliArgs, + config: &super::Config, mz: &Materialize, generation: u64, ) -> StatefulSet { @@ -1072,7 +1061,7 @@ fn create_environmentd_statefulset_object( // Add AWS arguments. if config.cloud_provider == CloudProvider::Aws { - if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() { + if let Some(azs) = config.environmentd_availability_zones.as_ref() { for az in azs { args.push(format!("--availability-zone={az}")); } @@ -1082,14 +1071,14 @@ fn create_environmentd_statefulset_object( .spec .environmentd_connection_role_arn .as_deref() - .or(config.aws_info.environmentd_connection_role_arn.as_deref()) + .or(config.environmentd_connection_role_arn.as_deref()) { args.push(format!( "--aws-connection-role-arn={}", environmentd_connection_role_arn )); } - if let Some(account_id) = &config.aws_info.aws_account_id { + if let Some(account_id) = &config.aws_account_id { args.push(format!("--aws-account-id={account_id}")); } @@ -1097,7 +1086,7 @@ fn create_environmentd_statefulset_object( "--aws-secrets-controller-tags=Environment={}", mz.name_unchecked() )]); - args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags); + args.extend_from_slice(&config.aws_secrets_controller_tags); } // Add Kubernetes arguments. @@ -1171,7 +1160,7 @@ fn create_environmentd_statefulset_object( // Add logging and tracing arguments. args.extend(["--log-format=json".into()]); - if let Some(endpoint) = &tracing.opentelemetry_endpoint { + if let Some(endpoint) = &config.tracing.opentelemetry_endpoint { args.push(format!("--opentelemetry-endpoint={}", endpoint)); } // --opentelemetry-resource also configures sentry tags @@ -1233,9 +1222,9 @@ fn create_environmentd_statefulset_object( args.push("--orchestrator-kubernetes-service-fs-group=999".to_string()); // Add Sentry arguments. - if let Some(sentry_dsn) = &tracing.sentry_dsn { + if let Some(sentry_dsn) = &config.tracing.sentry_dsn { args.push(format!("--sentry-dsn={}", sentry_dsn)); - if let Some(sentry_environment) = &tracing.sentry_environment { + if let Some(sentry_environment) = &config.tracing.sentry_environment { args.push(format!("--sentry-environment={}", sentry_environment)); } args.push(format!("--sentry-tag=region={}", config.region)); @@ -1618,7 +1607,7 @@ fn create_environmentd_statefulset_object( } fn create_connection_info( - config: &super::MaterializeControllerArgs, + config: &super::Config, mz: &Materialize, generation: u64, ) -> ConnectionInfo { diff --git a/src/orchestratord/src/k8s.rs b/src/orchestratord/src/k8s.rs index a83fa8b5233be..dc7a9e501efa9 100644 --- a/src/orchestratord/src/k8s.rs +++ b/src/orchestratord/src/k8s.rs @@ -7,16 +7,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::time::Duration; +use std::{future::ready, time::Duration}; +use futures::StreamExt; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition; use kube::{ Api, Client, CustomResourceExt, Resource, ResourceExt, api::{DeleteParams, Patch, PatchParams}, + runtime::{reflector, watcher}, }; -use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds}; use serde::{Serialize, de::DeserializeOwned}; -use tracing::info; +use tracing::{info, warn}; + +use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds}; const FIELD_MANAGER: &str = "orchestratord.materialize.cloud"; @@ -105,3 +108,37 @@ pub async fn register_crds( Ok(()) } + +pub async fn make_reflector(client: Client) -> reflector::Store +where + K: kube::Resource + + Clone + + Send + + Sync + + DeserializeOwned + + Serialize + + std::fmt::Debug + + 'static, +{ + let api = kube::Api::all(client); + let (store, writer) = reflector::store(); + let reflector = + reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29))); + mz_ore::task::spawn( + || format!("{} reflector", K::kind(&Default::default())), + async { + reflector + .for_each(|res| { + if let Err(e) = res { + warn!("error in {} reflector: {}", K::kind(&Default::default()), e); + } + ready(()) + }) + .await + }, + ); + // the only way this can return an error is if we drop the writer, + // which we do not ever do, so unwrap is fine + store.wait_until_ready().await.expect("writer dropped"); + store +} diff --git a/src/orchestratord/src/lib.rs b/src/orchestratord/src/lib.rs index 09dc0c1869219..85fc908fe8a4b 100644 --- a/src/orchestratord/src/lib.rs +++ b/src/orchestratord/src/lib.rs @@ -7,6 +7,44 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::Display; + pub mod controller; pub mod k8s; pub mod metrics; +pub mod tls; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + Anyhow(#[from] anyhow::Error), + Kube(#[from] kube::Error), + Reqwest(#[from] reqwest::Error), +} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Anyhow(e) => write!(f, "{e}"), + Self::Kube(e) => write!(f, "{e}"), + Self::Reqwest(e) => write!(f, "{e}"), + } + } +} + +pub fn matching_image_from_environmentd_image_ref( + environmentd_image_ref: &str, + image_name: &str, + image_tag: Option<&str>, +) -> String { + let namespace = environmentd_image_ref + .rsplit_once('/') + .unwrap_or(("materialize", "")) + .0; + let tag = image_tag.unwrap_or_else(|| { + environmentd_image_ref + .rsplit_once(':') + .unwrap_or(("", "unstable")) + .1 + }); + format!("{namespace}/{image_name}:{tag}") +} diff --git a/src/orchestratord/src/controller/materialize/tls.rs b/src/orchestratord/src/tls.rs similarity index 73% rename from src/orchestratord/src/controller/materialize/tls.rs rename to src/orchestratord/src/tls.rs index bbfeae9b8334b..342572d41c04e 100644 --- a/src/orchestratord/src/controller/materialize/tls.rs +++ b/src/orchestratord/src/tls.rs @@ -7,15 +7,37 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use mz_cloud_resources::crd::generated::cert_manager::certificates::{ - Certificate, CertificatePrivateKey, CertificatePrivateKeyAlgorithm, - CertificatePrivateKeyEncoding, CertificatePrivateKeyRotationPolicy, CertificateSpec, +use std::str::FromStr; + +use serde::Deserialize; + +use mz_cloud_resources::crd::{ + ManagedResource, MaterializeCertSpec, + generated::cert_manager::certificates::{ + Certificate, CertificatePrivateKey, CertificatePrivateKeyAlgorithm, + CertificatePrivateKeyEncoding, CertificatePrivateKeyRotationPolicy, CertificateSpec, + }, }; -use mz_cloud_resources::crd::materialize::v1alpha1::{Materialize, MaterializeCertSpec}; + +#[derive(Clone, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct DefaultCertificateSpecs { + pub balancerd_external: Option, + pub console_external: Option, + pub internal: Option, +} + +impl FromStr for DefaultCertificateSpecs { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_str(s) + } +} pub fn create_certificate( default_spec: Option, - mz: &Materialize, + resource: &impl ManagedResource, mz_cert_spec: Option, cert_name: String, secret_name: String, @@ -37,7 +59,7 @@ pub fn create_certificate( .labels .unwrap_or_default() .into_iter() - .chain(mz.default_labels()) + .chain(resource.default_labels()) .collect(), ); let mut dns_names = mz_cert_spec @@ -48,7 +70,7 @@ pub fn create_certificate( dns_names.extend(names); } Some(Certificate { - metadata: mz.managed_resource_meta(cert_name), + metadata: resource.managed_resource_meta(cert_name), spec: CertificateSpec { dns_names: Some(dns_names), duration: mz_cert_spec.duration.or(default_spec.duration),