diff --git a/misc/helm-charts/operator/templates/clusterrole.yaml b/misc/helm-charts/operator/templates/clusterrole.yaml index 531b74053e676..b42ec630fd608 100644 --- a/misc/helm-charts/operator/templates/clusterrole.yaml +++ b/misc/helm-charts/operator/templates/clusterrole.yaml @@ -88,6 +88,8 @@ rules: resources: - materializes - materializes/status + - balancers + - balancers/status - vpcendpoints verbs: - create diff --git a/src/cloud-resources/src/crd.rs b/src/cloud-resources/src/crd.rs index 503fde8169b3d..d5f6866b30ace 100644 --- a/src/cloud-resources/src/crd.rs +++ b/src/cloud-resources/src/crd.rs @@ -31,6 +31,7 @@ use crate::crd::generated::cert_manager::certificates::{ }; use mz_ore::retry::Retry; +pub mod balancer; pub mod generated; pub mod materialize; #[cfg(feature = "vpc-endpoints")] diff --git a/src/cloud-resources/src/crd/balancer.rs b/src/cloud-resources/src/crd/balancer.rs new file mode 100644 index 0000000000000..7fb5f623bf5e4 --- /dev/null +++ b/src/cloud-resources/src/crd/balancer.rs @@ -0,0 +1,159 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; + +use anyhow::bail; +use k8s_openapi::{ + api::core::v1::ResourceRequirements, apimachinery::pkg::apis::meta::v1::Condition, +}; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id}; + +pub mod v1alpha1 { + use super::*; + + #[derive(Clone, Debug)] + pub enum Routing<'a> { + Static(&'a StaticRoutingConfig), + Frontegg(&'a FronteggRoutingConfig), + } + + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)] + #[serde(rename_all = "camelCase")] + pub struct StaticRoutingConfig { + pub environmentd_namespace: String, + pub environmentd_service_name: String, + } + + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)] + #[serde(rename_all = "camelCase")] + pub struct FronteggRoutingConfig { + // TODO + } + + #[derive( + CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema, + )] + #[serde(rename_all = "camelCase")] + #[kube( + namespaced, + group = "materialize.cloud", + version = "v1alpha1", + kind = "Balancer", + singular = "balancer", + plural = "balancers", + status = "BalancerStatus", + printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.balancerdImageRef", "priority": 1}"#, + printcolumn = r#"{"name": "Ready", "type": "string", "description": "Whether the deployment is ready", "jsonPath": ".status.conditions[?(@.type==\"Ready\")].status", "priority": 1}"# + )] + pub struct BalancerSpec { + /// The balancerd image to run. + pub balancerd_image_ref: String, + // Resource requirements for the balancerd pod + pub resource_requirements: Option, + // Number of balancerd pods to create + pub replicas: Option, + // The configuration for generating an x509 certificate using cert-manager for balancerd + // to present to incoming connections. + // The dns_names and issuer_ref fields are required. + pub external_certificate_spec: Option, + // The configuration for generating an x509 certificate using cert-manager for balancerd + // to use to communicate with environmentd. + // The dns_names and issuer_ref fields are required. + pub internal_certificate_spec: Option, + // Annotations to apply to the pods + pub pod_annotations: Option>, + // Labels to apply to the pods + pub pod_labels: Option>, + + // Configuration for statically routing traffic + pub static_routing: Option, + // Configuration for routing traffic via Frontegg + pub frontegg_routing: Option, + + // This can be set to override the randomly chosen resource id + pub resource_id: Option, + } + + impl Balancer { + pub fn name_prefixed(&self, suffix: &str) -> String { + format!("mz{}-{}", self.resource_id(), suffix) + } + + pub fn resource_id(&self) -> &str { + &self.status.as_ref().unwrap().resource_id + } + + pub fn deployment_name(&self) -> String { + self.name_prefixed("balancerd") + } + + pub fn replicas(&self) -> i32 { + self.spec.replicas.unwrap_or(2) + } + + pub fn app_name(&self) -> String { + "balancerd".to_owned() + } + + pub fn service_name(&self) -> String { + self.name_prefixed("balancerd") + } + + pub fn external_certificate_name(&self) -> String { + self.name_prefixed("balancerd-external") + } + + pub fn external_certificate_secret_name(&self) -> String { + self.name_prefixed("balancerd-external-tls") + } + + pub fn routing(&self) -> anyhow::Result> { + match (&self.spec.static_routing, &self.spec.frontegg_routing) { + (Some(config), None) => Ok(Routing::Static(config)), + (None, Some(config)) => Ok(Routing::Frontegg(config)), + (None, None) => bail!("no routing configuration present"), + _ => bail!("multiple routing configurations present"), + } + } + + pub fn status(&self) -> BalancerStatus { + self.status.clone().unwrap_or_else(|| BalancerStatus { + resource_id: self + .spec + .resource_id + .clone() + .unwrap_or_else(new_resource_id), + conditions: vec![], + }) + } + } + + #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)] + #[serde(rename_all = "camelCase")] + pub struct BalancerStatus { + /// Resource identifier used as a name prefix to avoid pod name collisions. + pub resource_id: String, + + pub conditions: Vec, + } + + impl ManagedResource for Balancer { + fn default_labels(&self) -> BTreeMap { + BTreeMap::from_iter([( + "materialize.cloud/mz-resource-id".to_owned(), + self.resource_id().to_owned(), + )]) + } + } +} diff --git a/src/orchestratord/src/bin/orchestratord.rs b/src/orchestratord/src/bin/orchestratord.rs index 6f6b0dcebab65..8c83aaf898208 100644 --- a/src/orchestratord/src/bin/orchestratord.rs +++ b/src/orchestratord/src/bin/orchestratord.rs @@ -15,11 +15,15 @@ use std::{ use http::HeaderValue; use k8s_openapi::{ - api::core::v1::{Affinity, ResourceRequirements, Toleration}, + api::{ + apps::v1::Deployment, + core::v1::{Affinity, ResourceRequirements, Service, Toleration}, + }, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition, }; -use kube::runtime::watcher; +use kube::{Api, runtime::watcher}; use mz_cloud_provider::CloudProvider; +use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate; use tracing::info; use mz_build_info::{BuildInfo, build_info}; @@ -318,7 +322,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { aws_secrets_controller_tags: args.aws_secrets_controller_tags, environmentd_availability_zones: args.environmentd_availability_zones, ephemeral_volume_class: args.ephemeral_volume_class, - scheduler_name: args.scheduler_name, + scheduler_name: args.scheduler_name.clone(), enable_security_context: args.enable_security_context, enable_internal_statement_logging: args.enable_internal_statement_logging, disable_statement_logging: args.disable_statement_logging, @@ -330,10 +334,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { clusterd_node_selector: args.clusterd_node_selector, clusterd_affinity: args.clusterd_affinity, clusterd_tolerations: args.clusterd_tolerations, - balancerd_node_selector: args.balancerd_node_selector, - balancerd_affinity: args.balancerd_affinity, - balancerd_tolerations: args.balancerd_tolerations, - balancerd_default_resources: args.balancerd_default_resources, console_node_selector: args.console_node_selector, console_affinity: args.console_affinity, console_tolerations: args.console_tolerations, @@ -373,11 +373,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { environmentd_internal_http_port: args.environmentd_internal_http_port, environmentd_internal_persist_pubsub_port: args .environmentd_internal_persist_pubsub_port, - balancerd_sql_port: args.balancerd_sql_port, balancerd_http_port: args.balancerd_http_port, - balancerd_internal_http_port: args.balancerd_internal_http_port, console_http_port: args.console_http_port, - default_certificate_specs: args.default_certificate_specs, + default_certificate_specs: args.default_certificate_specs.clone(), disable_license_key_checks: args.disable_license_key_checks, tracing: args.tracing, orchestratord_namespace: namespace, @@ -391,6 +389,56 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { .run(), ); + mz_ore::task::spawn( + || "balancer controller", + k8s_controller::Controller::namespaced_all( + client.clone(), + controller::balancer::Context::new( + controller::balancer::Config { + enable_security_context: args.enable_security_context, + enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations, + image_pull_policy: args.image_pull_policy, + scheduler_name: args.scheduler_name, + balancerd_node_selector: args.balancerd_node_selector, + balancerd_affinity: args.balancerd_affinity, + balancerd_tolerations: args.balancerd_tolerations, + balancerd_default_resources: args.balancerd_default_resources, + default_certificate_specs: args.default_certificate_specs, + environmentd_sql_port: args.environmentd_sql_port, + environmentd_http_port: args.environmentd_http_port, + balancerd_sql_port: args.balancerd_sql_port, + balancerd_http_port: args.balancerd_http_port, + balancerd_internal_http_port: args.balancerd_internal_http_port, + }, + client.clone(), + ) + .await, + watcher::Config::default().timeout(29), + ) + .with_controller(|controller| { + controller + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + }) + .run(), + ); + info!("All tasks started successfully."); future::pending().await diff --git a/src/orchestratord/src/controller.rs b/src/orchestratord/src/controller.rs index 172fa788d7e51..3ada1742d52d0 100644 --- a/src/orchestratord/src/controller.rs +++ b/src/orchestratord/src/controller.rs @@ -7,4 +7,5 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod balancer; pub mod materialize; diff --git a/src/orchestratord/src/controller/balancer.rs b/src/orchestratord/src/controller/balancer.rs new file mode 100644 index 0000000000000..ad71ae4442026 --- /dev/null +++ b/src/orchestratord/src/controller/balancer.rs @@ -0,0 +1,503 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::bail; +use k8s_openapi::{ + api::{ + apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment}, + core::v1::{ + Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, + PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile, + SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration, + Volume, VolumeMount, + }, + }, + apimachinery::pkg::{ + apis::meta::v1::{Condition, LabelSelector, Time}, + util::intstr::IntOrString, + }, +}; +use kube::{ + Api, Client, Resource, ResourceExt, + api::{ObjectMeta, PostParams}, + runtime::{ + controller::Action, + reflector::{ObjectRef, Store}, + }, +}; +use maplit::btreemap; +use tracing::trace; + +use crate::{ + DefaultCertificateSpecs, Error, + k8s::{apply_resource, make_reflector}, + tls::{create_certificate, issuer_ref_defined}, +}; +use mz_cloud_resources::crd::{ + ManagedResource, + balancer::v1alpha1::{Balancer, Routing}, + generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, +}; +use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; +use mz_ore::{cli::KeyValueArg, instrument}; + +pub struct Config { + pub enable_security_context: bool, + pub enable_prometheus_scrape_annotations: bool, + + pub image_pull_policy: KubernetesImagePullPolicy, + pub scheduler_name: Option, + pub balancerd_node_selector: Vec>, + pub balancerd_affinity: Option, + pub balancerd_tolerations: Option>, + pub balancerd_default_resources: Option, + + pub default_certificate_specs: DefaultCertificateSpecs, + + pub environmentd_sql_port: u16, + pub environmentd_http_port: u16, + pub balancerd_sql_port: u16, + pub balancerd_http_port: u16, + pub balancerd_internal_http_port: u16, +} + +pub struct Context { + config: Config, + deployments: Store, +} + +impl Context { + pub async fn new(config: Config, client: Client) -> Self { + Self { + config, + deployments: make_reflector(client).await, + } + } + + async fn sync_deployment_status( + &self, + client: &Client, + balancer: &Balancer, + ) -> Result<(), kube::Error> { + let namespace = balancer.namespace().unwrap(); + let balancer_api: Api = Api::namespaced(client.clone(), &namespace); + + let Some(deployment) = self + .deployments + .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace)) + else { + return Ok(()); + }; + + let Some(deployment_conditions) = &deployment + .status + .as_ref() + .and_then(|status| status.conditions.as_ref()) + else { + // if the deployment doesn't have any conditions set yet, there + // is nothing to sync + return Ok(()); + }; + + let ready = deployment_conditions + .iter() + .any(|condition| condition.type_ == "Available" && condition.status == "True"); + let ready_str = if ready { "True" } else { "False" }; + + let mut status = balancer.status.clone().unwrap(); + if status + .conditions + .iter() + .any(|condition| condition.type_ == "Ready" && condition.status == ready_str) + { + // if the deployment status is already set correctly, we don't + // need to set it again (this prevents us from getting stuck in + // a reconcile loop) + return Ok(()); + } + + status.conditions = vec![Condition { + type_: "Ready".to_string(), + status: ready_str.to_string(), + last_transition_time: Time(chrono::offset::Utc::now()), + message: format!( + "balancerd deployment is{} ready", + if ready { "" } else { " not" } + ), + observed_generation: None, + reason: "DeploymentStatus".to_string(), + }]; + let mut new_balancer = balancer.clone(); + new_balancer.status = Some(status); + + balancer_api + .replace_status( + &balancer.name_unchecked(), + &PostParams::default(), + serde_json::to_vec(&new_balancer).unwrap(), + ) + .await?; + + Ok(()) + } + + fn create_external_certificate_object(&self, balancer: &Balancer) -> Option { + create_certificate( + self.config + .default_certificate_specs + .balancerd_external + .clone(), + balancer, + balancer.spec.external_certificate_spec.clone(), + balancer.external_certificate_name(), + balancer.external_certificate_secret_name(), + None, + CertificatePrivateKeyAlgorithm::Rsa, + Some(4096), + ) + } + + fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result { + let security_context = if self.config.enable_security_context { + // Since we want to adhere to the most restrictive security context, all + // of these fields have to be set how they are. + // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted + Some(SecurityContext { + run_as_non_root: Some(true), + capabilities: Some(Capabilities { + drop: Some(vec!["ALL".to_string()]), + ..Default::default() + }), + seccomp_profile: Some(SeccompProfile { + type_: "RuntimeDefault".to_string(), + ..Default::default() + }), + allow_privilege_escalation: Some(false), + ..Default::default() + }) + } else { + None + }; + + let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations { + Some(btreemap! { + "prometheus.io/scrape".to_owned() => "true".to_string(), + "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(), + "prometheus.io/path".to_owned() => "/metrics".to_string(), + "prometheus.io/scheme".to_owned() => "http".to_string(), + }) + } else { + None + }; + let mut pod_template_labels = balancer.default_labels(); + pod_template_labels.insert( + "materialize.cloud/name".to_owned(), + balancer.deployment_name(), + ); + pod_template_labels.insert("app".to_owned(), "balancerd".to_string()); + pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name()); + + let ports = vec![ + ContainerPort { + container_port: self.config.balancerd_sql_port.into(), + name: Some("pgwire".into()), + protocol: Some("TCP".into()), + ..Default::default() + }, + ContainerPort { + container_port: self.config.balancerd_http_port.into(), + name: Some("http".into()), + protocol: Some("TCP".into()), + ..Default::default() + }, + ContainerPort { + container_port: self.config.balancerd_internal_http_port.into(), + name: Some("internal-http".into()), + protocol: Some("TCP".into()), + ..Default::default() + }, + ]; + + let mut args = vec![ + "service".to_string(), + format!( + "--pgwire-listen-addr=0.0.0.0:{}", + self.config.balancerd_sql_port + ), + format!( + "--https-listen-addr=0.0.0.0:{}", + self.config.balancerd_http_port + ), + format!( + "--internal-http-listen-addr=0.0.0.0:{}", + self.config.balancerd_internal_http_port + ), + ]; + match balancer.routing()? { + Routing::Static(static_routing_config) => { + args.extend([ + format!( + "--https-resolver-template={}.{}.svc.cluster.local:{}", + static_routing_config.environmentd_service_name, + static_routing_config.environmentd_namespace, + self.config.environmentd_http_port + ), + format!( + "--static-resolver-addr={}.{}.svc.cluster.local:{}", + static_routing_config.environmentd_service_name, + static_routing_config.environmentd_namespace, + self.config.environmentd_sql_port + ), + ]); + } + Routing::Frontegg(_frontegg_routing_config) => { + bail!("frontegg routing is not yet implemented"); + } + } + + if issuer_ref_defined( + &self.config.default_certificate_specs.internal, + &balancer.spec.internal_certificate_spec, + ) { + args.push("--internal-tls".to_owned()) + } + + let mut volumes = Vec::new(); + let mut volume_mounts = Vec::new(); + if issuer_ref_defined( + &self.config.default_certificate_specs.balancerd_external, + &balancer.spec.external_certificate_spec, + ) { + volumes.push(Volume { + name: "external-certificate".to_owned(), + secret: Some(SecretVolumeSource { + default_mode: Some(0o400), + secret_name: Some(balancer.external_certificate_secret_name()), + items: None, + optional: Some(false), + }), + ..Default::default() + }); + volume_mounts.push(VolumeMount { + name: "external-certificate".to_owned(), + mount_path: "/etc/external_tls".to_owned(), + read_only: Some(true), + ..Default::default() + }); + args.extend([ + "--tls-mode=require".into(), + "--tls-cert=/etc/external_tls/tls.crt".into(), + "--tls-key=/etc/external_tls/tls.key".into(), + ]); + } else { + args.push("--tls-mode=disable".to_string()); + } + + let startup_probe = Probe { + http_get: Some(HTTPGetAction { + port: IntOrString::Int(self.config.balancerd_internal_http_port.into()), + path: Some("/api/readyz".into()), + ..Default::default() + }), + failure_threshold: Some(20), + initial_delay_seconds: Some(3), + period_seconds: Some(3), + success_threshold: Some(1), + timeout_seconds: Some(1), + ..Default::default() + }; + let readiness_probe = Probe { + http_get: Some(HTTPGetAction { + port: IntOrString::Int(self.config.balancerd_internal_http_port.into()), + path: Some("/api/readyz".into()), + ..Default::default() + }), + failure_threshold: Some(3), + period_seconds: Some(10), + success_threshold: Some(1), + timeout_seconds: Some(1), + ..Default::default() + }; + let liveness_probe = Probe { + http_get: Some(HTTPGetAction { + port: IntOrString::Int(self.config.balancerd_internal_http_port.into()), + path: Some("/api/livez".into()), + ..Default::default() + }), + failure_threshold: Some(3), + initial_delay_seconds: Some(8), + period_seconds: Some(10), + success_threshold: Some(1), + timeout_seconds: Some(1), + ..Default::default() + }; + + let container = Container { + name: "balancerd".to_owned(), + image: Some(balancer.spec.balancerd_image_ref.clone()), + image_pull_policy: Some(self.config.image_pull_policy.to_string()), + ports: Some(ports), + args: Some(args), + startup_probe: Some(startup_probe), + readiness_probe: Some(readiness_probe), + liveness_probe: Some(liveness_probe), + resources: balancer + .spec + .resource_requirements + .clone() + .or_else(|| self.config.balancerd_default_resources.clone()), + security_context: security_context.clone(), + volume_mounts: Some(volume_mounts), + ..Default::default() + }; + + let deployment_spec = DeploymentSpec { + replicas: Some(balancer.replicas()), + selector: LabelSelector { + match_labels: Some(pod_template_labels.clone()), + ..Default::default() + }, + strategy: Some(DeploymentStrategy { + rolling_update: Some(RollingUpdateDeployment { + // Allow a complete set of new pods at once, to minimize the + // chances of a new connection going to a pod that will be + // immediately drained + max_surge: Some(IntOrString::String("100%".into())), + ..Default::default() + }), + ..Default::default() + }), + template: PodTemplateSpec { + // not using managed_resource_meta because the pod should be + // owned by the deployment, not the materialize instance + metadata: Some(ObjectMeta { + annotations: pod_template_annotations, + labels: Some(pod_template_labels), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![container], + node_selector: Some( + self.config + .balancerd_node_selector + .iter() + .map(|selector| (selector.key.clone(), selector.value.clone())) + .collect(), + ), + affinity: self.config.balancerd_affinity.clone(), + tolerations: self.config.balancerd_tolerations.clone(), + security_context: Some(PodSecurityContext { + fs_group: Some(999), + run_as_user: Some(999), + run_as_group: Some(999), + ..Default::default() + }), + scheduler_name: self.config.scheduler_name.clone(), + volumes: Some(volumes), + ..Default::default() + }), + }, + ..Default::default() + }; + + Ok(Deployment { + metadata: balancer.managed_resource_meta(balancer.deployment_name()), + spec: Some(deployment_spec), + status: None, + }) + } + + fn create_service_object(&self, balancer: &Balancer) -> Service { + let selector = + btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()}; + + let ports = vec![ + ServicePort { + name: Some("http".to_string()), + protocol: Some("TCP".to_string()), + port: self.config.balancerd_http_port.into(), + target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())), + ..Default::default() + }, + ServicePort { + name: Some("pgwire".to_string()), + protocol: Some("TCP".to_string()), + port: self.config.balancerd_sql_port.into(), + target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())), + ..Default::default() + }, + ]; + + let spec = ServiceSpec { + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + selector: Some(selector), + ports: Some(ports), + ..Default::default() + }; + + Service { + metadata: balancer.managed_resource_meta(balancer.service_name()), + spec: Some(spec), + status: None, + } + } +} + +#[async_trait::async_trait] +impl k8s_controller::Context for Context { + type Resource = Balancer; + type Error = Error; + + #[instrument(fields())] + async fn apply( + &self, + client: Client, + balancer: &Self::Resource, + ) -> Result, Self::Error> { + if balancer.status.is_none() { + let balancer_api: Api = + Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap()); + let mut new_balancer = balancer.clone(); + new_balancer.status = Some(balancer.status()); + balancer_api + .replace_status( + &balancer.name_unchecked(), + &PostParams::default(), + serde_json::to_vec(&new_balancer).unwrap(), + ) + .await?; + // Updating the status should trigger a reconciliation + // which will include a status this time. + return Ok(None); + } + + let namespace = balancer.namespace().unwrap(); + let certificate_api: Api = Api::namespaced(client.clone(), &namespace); + let deployment_api: Api = Api::namespaced(client.clone(), &namespace); + let service_api: Api = Api::namespaced(client.clone(), &namespace); + + if let Some(external_certificate) = self.create_external_certificate_object(balancer) { + trace!("creating new balancerd external certificate"); + apply_resource(&certificate_api, &external_certificate).await?; + } + + let deployment = self.create_deployment_object(balancer)?; + trace!("creating new balancerd deployment"); + apply_resource(&deployment_api, &deployment).await?; + + let service = self.create_service_object(balancer); + trace!("creating new balancerd service"); + apply_resource(&service_api, &service).await?; + + self.sync_deployment_status(&client, balancer).await?; + + Ok(None) + } +} diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index fb73a851be3e2..84c0686ac3678 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -10,6 +10,7 @@ use std::{ collections::BTreeSet, sync::{Arc, Mutex}, + time::Duration, }; use anyhow::Context as _; @@ -27,19 +28,23 @@ use tracing::{debug, trace}; use uuid::Uuid; use crate::{ - DefaultCertificateSpecs, Error, controller::materialize::environmentd::V161, - k8s::make_reflector, matching_image_from_environmentd_image_ref, metrics::Metrics, + DefaultCertificateSpecs, Error, + controller::materialize::environmentd::V161, + k8s::{apply_resource, delete_resource, make_reflector}, + matching_image_from_environmentd_image_ref, + metrics::Metrics, }; use mz_cloud_provider::CloudProvider; -use mz_cloud_resources::crd::materialize::v1alpha1::{ - Materialize, MaterializeRolloutStrategy, MaterializeStatus, +use mz_cloud_resources::crd::{ + ManagedResource, + balancer::v1alpha1::{Balancer, BalancerSpec}, + materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus}, }; use mz_license_keys::validate; use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; use mz_orchestrator_tracing::TracingCliArgs; use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument}; -pub mod balancer; pub mod console; pub mod environmentd; @@ -79,10 +84,6 @@ pub struct Config { pub clusterd_node_selector: Vec>, pub clusterd_affinity: Option, pub clusterd_tolerations: Option>, - pub balancerd_node_selector: Vec>, - pub balancerd_affinity: Option, - pub balancerd_tolerations: Option>, - pub balancerd_default_resources: Option, pub console_node_selector: Vec>, pub console_affinity: Option, pub console_tolerations: Option>, @@ -115,9 +116,7 @@ pub struct Config { pub environmentd_internal_http_port: u16, pub environmentd_internal_persist_pubsub_port: u16, - pub balancerd_sql_port: u16, pub balancerd_http_port: u16, - pub balancerd_internal_http_port: u16, pub console_http_port: u16, @@ -277,6 +276,7 @@ impl k8s_controller::Context for Context { mz: &Self::Resource, ) -> Result, Self::Error> { let mz_api: Api = Api::namespaced(client.clone(), &mz.namespace()); + let balancer_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let secret_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let status = mz.status(); @@ -426,7 +426,7 @@ impl k8s_controller::Context for Context { // we fail later on, we want to ensure that the // rollout gets retried. last_completed_rollout_request: status.last_completed_rollout_request, - resource_id: status.resource_id, + resource_id: status.resource_id.clone(), resources_hash: String::new(), conditions: vec![Condition { type_: "UpToDate".into(), @@ -557,7 +557,7 @@ impl k8s_controller::Context for Context { MaterializeStatus { active_generation, last_completed_rollout_request: mz.requested_reconciliation_id(), - resource_id: status.resource_id, + resource_id: status.resource_id.clone(), resources_hash: status.resources_hash, conditions: vec![Condition { type_: "UpToDate".into(), @@ -597,7 +597,7 @@ impl k8s_controller::Context for Context { MaterializeStatus { active_generation, last_completed_rollout_request: mz.requested_reconciliation_id(), - resource_id: status.resource_id, + resource_id: status.resource_id.clone(), resources_hash: status.resources_hash, conditions: vec![Condition { type_: "UpToDate".into(), @@ -627,11 +627,36 @@ impl k8s_controller::Context for Context { // enforced by the environmentd rollout process being able to call // into the promotion endpoint - let balancer = balancer::Resources::new(&self.config, mz); if self.config.create_balancers { - result = balancer.apply(&client, &mz.namespace()).await?; + let balancer = Balancer { + metadata: mz.managed_resource_meta(mz.name_unchecked()), + spec: BalancerSpec { + balancerd_image_ref: matching_image_from_environmentd_image_ref( + &mz.spec.environmentd_image_ref, + "balancerd", + None, + ), + resource_requirements: mz.spec.balancerd_resource_requirements.clone(), + replicas: Some(mz.balancerd_replicas()), + external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(), + internal_certificate_spec: mz.spec.internal_certificate_spec.clone(), + pod_annotations: mz.spec.pod_annotations.clone(), + pod_labels: mz.spec.pod_labels.clone(), + static_routing: Some( + mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig { + environmentd_namespace: mz.namespace(), + environmentd_service_name: mz.environmentd_service_name(), + }, + ), + frontegg_routing: None, + resource_id: Some(status.resource_id), + }, + status: None, + }; + let balancer = apply_resource(&balancer_api, &balancer).await?; + result = wait_for_balancer(&balancer)?; } else { - result = balancer.cleanup(&client, &mz.namespace()).await?; + delete_resource(&balancer_api, &mz.name_prefixed("balancer")).await?; } if let Some(action) = result { @@ -639,8 +664,7 @@ impl k8s_controller::Context for Context { } // and the console relies on the balancer service existing, which is - // enforced by balancer::Resources::apply having a check for its pods - // being up, and not returning successfully until they are + // enforced by wait_for_balancer let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':') else { @@ -685,3 +709,20 @@ impl k8s_controller::Context for Context { Ok(None) } } + +fn wait_for_balancer(balancer: &Balancer) -> Result, Error> { + if let Some(conditions) = balancer + .status + .as_ref() + .map(|status| status.conditions.as_slice()) + { + if conditions + .iter() + .any(|condition| condition.type_ == "Ready" && condition.status == "True") + { + return Ok(None); + } + } + + Ok(Some(Action::requeue(Duration::from_secs(1)))) +} diff --git a/src/orchestratord/src/controller/materialize/balancer.rs b/src/orchestratord/src/controller/materialize/balancer.rs deleted file mode 100644 index 247221351ad5c..0000000000000 --- a/src/orchestratord/src/controller/materialize/balancer.rs +++ /dev/null @@ -1,411 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::time::Duration; - -use k8s_openapi::{ - api::{ - apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment}, - core::v1::{ - Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec, - PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service, - ServicePort, ServiceSpec, Volume, VolumeMount, - }, - }, - apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, -}; -use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action}; -use maplit::btreemap; -use tracing::trace; - -use crate::{ - controller::materialize::{Error, matching_image_from_environmentd_image_ref}, - k8s::{apply_resource, delete_resource, get_resource}, - tls::{create_certificate, issuer_ref_defined}, -}; -use mz_cloud_resources::crd::{ - ManagedResource, - generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, - materialize::v1alpha1::Materialize, -}; -use mz_ore::instrument; - -pub struct Resources { - balancerd_external_certificate: Box>, - balancerd_deployment: Box, - balancerd_service: Box, -} - -impl Resources { - pub fn new(config: &super::Config, mz: &Materialize) -> Self { - let balancerd_external_certificate = - Box::new(create_balancerd_external_certificate(config, mz)); - let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz)); - let balancerd_service = Box::new(create_balancerd_service_object(config, mz)); - - Self { - balancerd_external_certificate, - balancerd_deployment, - balancerd_service, - } - } - - #[instrument] - pub async fn apply(&self, client: &Client, namespace: &str) -> Result, Error> { - let certificate_api: Api = Api::namespaced(client.clone(), namespace); - let deployment_api: Api = Api::namespaced(client.clone(), namespace); - let service_api: Api = Api::namespaced(client.clone(), namespace); - - if let Some(certificate) = &*self.balancerd_external_certificate { - trace!("creating new balancerd external certificate"); - apply_resource(&certificate_api, certificate).await?; - } - - trace!("creating new balancerd deployment"); - apply_resource(&deployment_api, &*self.balancerd_deployment).await?; - - trace!("creating new balancerd service"); - apply_resource(&service_api, &*self.balancerd_service).await?; - - if let Some(deployment) = - get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await? - { - for condition in deployment - .status - .as_ref() - .and_then(|status| status.conditions.as_deref()) - .unwrap_or(&[]) - { - if condition.type_ == "Available" && condition.status == "True" { - return Ok(None); - } - } - } - - Ok(Some(Action::requeue(Duration::from_secs(1)))) - } - - #[instrument] - pub async fn cleanup( - &self, - client: &Client, - namespace: &str, - ) -> Result, anyhow::Error> { - let certificate_api: Api = Api::namespaced(client.clone(), namespace); - let deployment_api: Api = Api::namespaced(client.clone(), namespace); - let service_api: Api = Api::namespaced(client.clone(), namespace); - - trace!("deleting balancerd service"); - delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?; - - trace!("deleting balancerd deployment"); - delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?; - - if let Some(certificate) = &*self.balancerd_external_certificate { - trace!("deleting balancerd external certificate"); - delete_resource(&certificate_api, &certificate.name_unchecked()).await?; - } - - Ok(None) - } -} - -fn create_balancerd_external_certificate( - config: &super::Config, - mz: &Materialize, -) -> Option { - create_certificate( - config.default_certificate_specs.balancerd_external.clone(), - mz, - mz.spec.balancerd_external_certificate_spec.clone(), - mz.balancerd_external_certificate_name(), - mz.balancerd_external_certificate_secret_name(), - None, - CertificatePrivateKeyAlgorithm::Rsa, - Some(4096), - ) -} - -fn create_balancerd_deployment_object(config: &super::Config, mz: &Materialize) -> Deployment { - let security_context = if config.enable_security_context { - // Since we want to adhere to the most restrictive security context, all - // of these fields have to be set how they are. - // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted - Some(SecurityContext { - run_as_non_root: Some(true), - capabilities: Some(Capabilities { - drop: Some(vec!["ALL".to_string()]), - ..Default::default() - }), - seccomp_profile: Some(SeccompProfile { - type_: "RuntimeDefault".to_string(), - ..Default::default() - }), - allow_privilege_escalation: Some(false), - ..Default::default() - }) - } else { - None - }; - - let pod_template_annotations = if config.enable_prometheus_scrape_annotations { - Some(btreemap! { - "prometheus.io/scrape".to_owned() => "true".to_string(), - "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(), - "prometheus.io/path".to_owned() => "/metrics".to_string(), - "prometheus.io/scheme".to_owned() => "http".to_string(), - }) - } else { - None - }; - let mut pod_template_labels = mz.default_labels(); - pod_template_labels.insert( - "materialize.cloud/name".to_owned(), - mz.balancerd_deployment_name(), - ); - pod_template_labels.insert("app".to_owned(), "balancerd".to_string()); - pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name()); - - let ports = vec![ - ContainerPort { - container_port: config.balancerd_sql_port.into(), - name: Some("pgwire".into()), - protocol: Some("TCP".into()), - ..Default::default() - }, - ContainerPort { - container_port: config.balancerd_http_port.into(), - name: Some("http".into()), - protocol: Some("TCP".into()), - ..Default::default() - }, - ContainerPort { - container_port: config.balancerd_internal_http_port.into(), - name: Some("internal-http".into()), - protocol: Some("TCP".into()), - ..Default::default() - }, - ]; - - let mut args = vec![ - "service".to_string(), - format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port), - format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port), - format!( - "--internal-http-listen-addr=0.0.0.0:{}", - config.balancerd_internal_http_port - ), - format!( - "--https-resolver-template={}.{}.svc.cluster.local:{}", - mz.environmentd_service_name(), - mz.namespace(), - config.environmentd_http_port - ), - format!( - "--static-resolver-addr={}.{}.svc.cluster.local:{}", - mz.environmentd_service_name(), - mz.namespace(), - config.environmentd_sql_port - ), - ]; - - if issuer_ref_defined( - &config.default_certificate_specs.internal, - &mz.spec.internal_certificate_spec, - ) { - args.push("--internal-tls".to_owned()) - } - - let mut volumes = Vec::new(); - let mut volume_mounts = Vec::new(); - if issuer_ref_defined( - &config.default_certificate_specs.balancerd_external, - &mz.spec.balancerd_external_certificate_spec, - ) { - volumes.push(Volume { - name: "external-certificate".to_owned(), - secret: Some(SecretVolumeSource { - default_mode: Some(0o400), - secret_name: Some(mz.balancerd_external_certificate_secret_name()), - items: None, - optional: Some(false), - }), - ..Default::default() - }); - volume_mounts.push(VolumeMount { - name: "external-certificate".to_owned(), - mount_path: "/etc/external_tls".to_owned(), - read_only: Some(true), - ..Default::default() - }); - args.extend([ - "--tls-mode=require".into(), - "--tls-cert=/etc/external_tls/tls.crt".into(), - "--tls-key=/etc/external_tls/tls.key".into(), - ]); - } else { - args.push("--tls-mode=disable".to_string()); - } - - let startup_probe = Probe { - http_get: Some(HTTPGetAction { - port: IntOrString::Int(config.balancerd_internal_http_port.into()), - path: Some("/api/readyz".into()), - ..Default::default() - }), - failure_threshold: Some(20), - initial_delay_seconds: Some(3), - period_seconds: Some(3), - success_threshold: Some(1), - timeout_seconds: Some(1), - ..Default::default() - }; - let readiness_probe = Probe { - http_get: Some(HTTPGetAction { - port: IntOrString::Int(config.balancerd_internal_http_port.into()), - path: Some("/api/readyz".into()), - ..Default::default() - }), - failure_threshold: Some(3), - period_seconds: Some(10), - success_threshold: Some(1), - timeout_seconds: Some(1), - ..Default::default() - }; - let liveness_probe = Probe { - http_get: Some(HTTPGetAction { - port: IntOrString::Int(config.balancerd_internal_http_port.into()), - path: Some("/api/livez".into()), - ..Default::default() - }), - failure_threshold: Some(3), - initial_delay_seconds: Some(8), - period_seconds: Some(10), - success_threshold: Some(1), - timeout_seconds: Some(1), - ..Default::default() - }; - - let container = Container { - name: "balancerd".to_owned(), - image: Some(matching_image_from_environmentd_image_ref( - &mz.spec.environmentd_image_ref, - "balancerd", - None, - )), - image_pull_policy: Some(config.image_pull_policy.to_string()), - ports: Some(ports), - args: Some(args), - startup_probe: Some(startup_probe), - readiness_probe: Some(readiness_probe), - liveness_probe: Some(liveness_probe), - resources: mz - .spec - .balancerd_resource_requirements - .clone() - .or_else(|| config.balancerd_default_resources.clone()), - security_context: security_context.clone(), - volume_mounts: Some(volume_mounts), - ..Default::default() - }; - - let deployment_spec = DeploymentSpec { - replicas: Some(mz.balancerd_replicas()), - selector: LabelSelector { - match_labels: Some(pod_template_labels.clone()), - ..Default::default() - }, - strategy: Some(DeploymentStrategy { - rolling_update: Some(RollingUpdateDeployment { - // Allow a complete set of new pods at once, to minimize the - // chances of a new connection going to a pod that will be - // immediately drained - max_surge: Some(IntOrString::String("100%".into())), - ..Default::default() - }), - ..Default::default() - }), - template: PodTemplateSpec { - // not using managed_resource_meta because the pod should be - // owned by the deployment, not the materialize instance - metadata: Some(ObjectMeta { - annotations: pod_template_annotations, - labels: Some(pod_template_labels), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![container], - node_selector: Some( - config - .balancerd_node_selector - .iter() - .map(|selector| (selector.key.clone(), selector.value.clone())) - .collect(), - ), - affinity: config.balancerd_affinity.clone(), - tolerations: config.balancerd_tolerations.clone(), - security_context: Some(PodSecurityContext { - fs_group: Some(999), - run_as_user: Some(999), - run_as_group: Some(999), - ..Default::default() - }), - scheduler_name: config.scheduler_name.clone(), - service_account_name: Some(mz.service_account_name()), - volumes: Some(volumes), - ..Default::default() - }), - }, - ..Default::default() - }; - - Deployment { - metadata: ObjectMeta { - ..mz.managed_resource_meta(mz.balancerd_deployment_name()) - }, - spec: Some(deployment_spec), - status: None, - } -} - -fn create_balancerd_service_object(config: &super::Config, mz: &Materialize) -> Service { - let selector = - btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()}; - - let ports = vec![ - ServicePort { - name: Some("http".to_string()), - protocol: Some("TCP".to_string()), - port: config.balancerd_http_port.into(), - target_port: Some(IntOrString::Int(config.balancerd_http_port.into())), - ..Default::default() - }, - ServicePort { - name: Some("pgwire".to_string()), - protocol: Some("TCP".to_string()), - port: config.balancerd_sql_port.into(), - target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())), - ..Default::default() - }, - ]; - - let spec = ServiceSpec { - type_: Some("ClusterIP".to_string()), - cluster_ip: Some("None".to_string()), - selector: Some(selector), - ports: Some(ports), - ..Default::default() - }; - - Service { - metadata: mz.managed_resource_meta(mz.balancerd_service_name()), - spec: Some(spec), - status: None, - } -} diff --git a/src/orchestratord/src/controller/materialize/environmentd.rs b/src/orchestratord/src/controller/materialize/environmentd.rs index 59f52e4412c1a..96979fa223b72 100644 --- a/src/orchestratord/src/controller/materialize/environmentd.rs +++ b/src/orchestratord/src/controller/materialize/environmentd.rs @@ -495,7 +495,18 @@ fn create_environmentd_network_policies( // TODO (Alex) filter to just clusterd and environmentd, // once we get a consistent set of labels for both. let all_pods_label_selector = LabelSelector { - match_labels: Some(mz.default_labels()), + // TODO: can't use default_labels() here because it needs to be + // consistent between balancer and materialize resources, and + // materialize resources have additional labels - we should + // figure out something better here (probably balancers should + // install their own network policies) + match_labels: Some( + [( + "materialize.cloud/mz-resource-id".to_owned(), + mz.resource_id().to_owned(), + )] + .into(), + ), ..Default::default() }; network_policies.extend([ diff --git a/src/orchestratord/src/k8s.rs b/src/orchestratord/src/k8s.rs index 540b281123b0b..f2c8be6ea7d1d 100644 --- a/src/orchestratord/src/k8s.rs +++ b/src/orchestratord/src/k8s.rs @@ -35,18 +35,18 @@ where } } -pub async fn apply_resource(api: &Api, resource: &K) -> Result<(), anyhow::Error> +pub async fn apply_resource(api: &Api, resource: &K) -> Result where K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static, ::DynamicType: Default, { - api.patch( - &resource.name_unchecked(), - &PatchParams::apply(FIELD_MANAGER).force(), - &Patch::Apply(resource), - ) - .await?; - Ok(()) + Ok(api + .patch( + &resource.name_unchecked(), + &PatchParams::apply(FIELD_MANAGER).force(), + &Patch::Apply(resource), + ) + .await?) } pub async fn delete_resource(api: &Api, name: &str) -> Result<(), anyhow::Error> @@ -94,6 +94,10 @@ pub async fn register_crds( crds: vec![mz_crd], stored_version: String::from("v1alpha1"), }, + VersionedCrd { + crds: vec![crd::balancer::v1alpha1::Balancer::crd()], + stored_version: String::from("v1alpha1"), + }, VersionedCrd { crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()], stored_version: String::from("v1"),