diff --git a/misc/helm-charts/operator/templates/clusterrole.yaml b/misc/helm-charts/operator/templates/clusterrole.yaml index b42ec630fd608..ce265f5d89ebc 100644 --- a/misc/helm-charts/operator/templates/clusterrole.yaml +++ b/misc/helm-charts/operator/templates/clusterrole.yaml @@ -90,6 +90,8 @@ rules: - materializes/status - balancers - balancers/status + - consoles + - consoles/status - vpcendpoints verbs: - create diff --git a/src/cloud-resources/src/crd.rs b/src/cloud-resources/src/crd.rs index d5f6866b30ace..b59615597a44b 100644 --- a/src/cloud-resources/src/crd.rs +++ b/src/cloud-resources/src/crd.rs @@ -32,6 +32,7 @@ use crate::crd::generated::cert_manager::certificates::{ use mz_ore::retry::Retry; pub mod balancer; +pub mod console; pub mod generated; pub mod materialize; #[cfg(feature = "vpc-endpoints")] diff --git a/src/cloud-resources/src/crd/console.rs b/src/cloud-resources/src/crd/console.rs new file mode 100644 index 0000000000000..8abda35651258 --- /dev/null +++ b/src/cloud-resources/src/crd/console.rs @@ -0,0 +1,145 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; + +use k8s_openapi::{ + api::core::v1::ResourceRequirements, apimachinery::pkg::apis::meta::v1::Condition, +}; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id}; +use mz_server_core::listeners::AuthenticatorKind; + +pub mod v1alpha1 { + use super::*; + + #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)] + #[serde(rename_all = "camelCase")] + pub struct BalancerdRef { + // The service name for the balancerd service to connect to + pub service_name: String, + // The namespace the balancerd service runs in + pub namespace: String, + // The configuration for generating an x509 certificate using cert-manager for balancerd + // to present to incoming connections. + // The dns_names and issuer_ref fields are required. + pub external_certificate_spec: Option, + /// How to authenticate with Materialize. + #[serde(default)] + pub authenticator_kind: AuthenticatorKind, + } + + #[derive( + CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema, + )] + #[serde(rename_all = "camelCase")] + #[kube( + namespaced, + group = "materialize.cloud", + version = "v1alpha1", + kind = "Console", + singular = "console", + plural = "consoles", + status = "ConsoleStatus", + printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.consoleImageRef", "priority": 1}"#, + printcolumn = r#"{"name": "Ready", "type": "string", "description": "Whether the deployment is ready", "jsonPath": ".status.conditions[?(@.type==\"Ready\")].status", "priority": 1}"# + )] + pub struct ConsoleSpec { + /// The console image to run. + pub console_image_ref: String, + // Resource requirements for the console pod + pub resource_requirements: Option, + // Number of console pods to create + pub replicas: Option, + // The configuration for generating an x509 certificate using cert-manager for console + // to present to incoming connections. + // The dns_names and issuer_ref fields are required. + pub external_certificate_spec: Option, + // Annotations to apply to the pods + pub pod_annotations: Option>, + // Labels to apply to the pods + pub pod_labels: Option>, + + // Connection information for the balancerd service to use + pub balancerd: BalancerdRef, + + // This can be set to override the randomly chosen resource id + pub resource_id: Option, + } + + impl Console { + pub fn name_prefixed(&self, suffix: &str) -> String { + format!("mz{}-{}", self.resource_id(), suffix) + } + + pub fn resource_id(&self) -> &str { + &self.status.as_ref().unwrap().resource_id + } + + pub fn deployment_name(&self) -> String { + self.name_prefixed("console") + } + + pub fn replicas(&self) -> i32 { + self.spec.replicas.unwrap_or(2) + } + + pub fn app_name(&self) -> String { + "console".to_owned() + } + + pub fn service_name(&self) -> String { + self.name_prefixed("console") + } + + pub fn configmap_name(&self) -> String { + self.name_prefixed("console") + } + + pub fn external_certificate_name(&self) -> String { + self.name_prefixed("console-external") + } + + pub fn external_certificate_secret_name(&self) -> String { + self.name_prefixed("console-external-tls") + } + + pub fn status(&self) -> ConsoleStatus { + self.status.clone().unwrap_or_else(|| ConsoleStatus { + resource_id: self + .spec + .resource_id + .clone() + .unwrap_or_else(new_resource_id), + conditions: vec![], + }) + } + } + + #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)] + #[serde(rename_all = "camelCase")] + pub struct ConsoleStatus { + /// Resource identifier used as a name prefix to avoid pod name collisions. + pub resource_id: String, + + pub conditions: Vec, + } + + impl ManagedResource for Console { + fn default_labels(&self) -> BTreeMap { + BTreeMap::from_iter([( + "materialize.cloud/mz-resource-id".to_owned(), + self.resource_id().to_owned(), + )]) + } + } +} diff --git a/src/orchestratord/src/bin/orchestratord.rs b/src/orchestratord/src/bin/orchestratord.rs index 30e06301fb199..f967dc3e7235d 100644 --- a/src/orchestratord/src/bin/orchestratord.rs +++ b/src/orchestratord/src/bin/orchestratord.rs @@ -17,7 +17,8 @@ use http::HeaderValue; use k8s_openapi::{ api::{ apps::v1::Deployment, - core::v1::{Affinity, ResourceRequirements, Service, Toleration}, + core::v1::{Affinity, ConfigMap, ResourceRequirements, Service, Toleration}, + networking::v1::NetworkPolicy, }, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition, }; @@ -335,14 +336,10 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { clusterd_node_selector: args.clusterd_node_selector, clusterd_affinity: args.clusterd_affinity, clusterd_tolerations: args.clusterd_tolerations, - console_node_selector: args.console_node_selector, - console_affinity: args.console_affinity, - console_tolerations: args.console_tolerations, - console_default_resources: args.console_default_resources, image_pull_policy: args.image_pull_policy, network_policies_internal_enabled: args.network_policies_internal_enabled, network_policies_ingress_enabled: args.network_policies_ingress_enabled, - network_policies_ingress_cidrs: args.network_policies_ingress_cidrs, + network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(), network_policies_egress_enabled: args.network_policies_egress_enabled, network_policies_egress_cidrs: args.network_policies_egress_cidrs, environmentd_cluster_replica_sizes: args.environmentd_cluster_replica_sizes, @@ -374,8 +371,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { environmentd_internal_http_port: args.environmentd_internal_http_port, environmentd_internal_persist_pubsub_port: args .environmentd_internal_persist_pubsub_port, - balancerd_http_port: args.balancerd_http_port, - console_http_port: args.console_http_port, default_certificate_specs: args.default_certificate_specs.clone(), disable_license_key_checks: args.disable_license_key_checks, tracing: args.tracing, @@ -399,12 +394,12 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { enable_security_context: args.enable_security_context, enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations, image_pull_policy: args.image_pull_policy, - scheduler_name: args.scheduler_name, + scheduler_name: args.scheduler_name.clone(), balancerd_node_selector: args.balancerd_node_selector, balancerd_affinity: args.balancerd_affinity, balancerd_tolerations: args.balancerd_tolerations, balancerd_default_resources: args.balancerd_default_resources, - default_certificate_specs: args.default_certificate_specs, + default_certificate_specs: args.default_certificate_specs.clone(), environmentd_sql_port: args.environmentd_sql_port, environmentd_http_port: args.environmentd_http_port, balancerd_sql_port: args.balancerd_sql_port, @@ -440,6 +435,67 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { .run(), ); + mz_ore::task::spawn( + || "console controller", + k8s_controller::Controller::namespaced_all( + client.clone(), + controller::console::Context::new( + controller::console::Config { + enable_security_context: args.enable_security_context, + enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations, + image_pull_policy: args.image_pull_policy, + scheduler_name: args.scheduler_name, + console_node_selector: args.console_node_selector, + console_affinity: args.console_affinity, + console_tolerations: args.console_tolerations, + console_default_resources: args.console_default_resources, + network_policies_ingress_enabled: args.network_policies_ingress_enabled, + network_policies_ingress_cidrs: args.network_policies_ingress_cidrs, + default_certificate_specs: args.default_certificate_specs, + console_http_port: args.console_http_port, + balancerd_http_port: args.balancerd_http_port, + }, + client.clone(), + ) + .await, + watcher::Config::default().timeout(29), + ) + .with_controller(|controller| { + controller + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + }) + .run(), + ); + info!("All tasks started successfully."); future::pending().await diff --git a/src/orchestratord/src/controller.rs b/src/orchestratord/src/controller.rs index 3ada1742d52d0..a4d0b8c23bb4e 100644 --- a/src/orchestratord/src/controller.rs +++ b/src/orchestratord/src/controller.rs @@ -8,4 +8,5 @@ // by the Apache License, Version 2.0. pub mod balancer; +pub mod console; pub mod materialize; diff --git a/src/orchestratord/src/controller/console.rs b/src/orchestratord/src/controller/console.rs new file mode 100644 index 0000000000000..7758629e61abc --- /dev/null +++ b/src/orchestratord/src/controller/console.rs @@ -0,0 +1,544 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use k8s_openapi::{ + api::{ + apps::v1::{Deployment, DeploymentSpec}, + core::v1::{ + Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, + EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, + ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service, + ServicePort, ServiceSpec, Toleration, Volume, VolumeMount, + }, + networking::v1::{ + IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort, + NetworkPolicySpec, + }, + }, + apimachinery::pkg::{ + apis::meta::v1::{Condition, LabelSelector, Time}, + util::intstr::IntOrString, + }, +}; +use kube::{ + Api, Client, Resource, ResourceExt, + api::{ObjectMeta, PostParams}, + runtime::{ + controller::Action, + reflector::{ObjectRef, Store}, + }, +}; +use maplit::btreemap; +use serde::Serialize; +use tracing::trace; + +use crate::{ + Error, + k8s::{apply_resource, make_reflector}, + tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined}, +}; +use mz_cloud_resources::crd::{ + ManagedResource, + console::v1alpha1::Console, + generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, +}; +use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; +use mz_ore::{cli::KeyValueArg, instrument}; +use mz_server_core::listeners::AuthenticatorKind; + +pub struct Config { + pub enable_security_context: bool, + pub enable_prometheus_scrape_annotations: bool, + + pub image_pull_policy: KubernetesImagePullPolicy, + pub scheduler_name: Option, + pub console_node_selector: Vec>, + pub console_affinity: Option, + pub console_tolerations: Option>, + pub console_default_resources: Option, + pub network_policies_ingress_enabled: bool, + pub network_policies_ingress_cidrs: Vec, + + pub default_certificate_specs: DefaultCertificateSpecs, + + pub console_http_port: u16, + pub balancerd_http_port: u16, +} + +#[derive(Serialize)] +struct AppConfig { + version: String, + auth: AppConfigAuth, +} + +#[derive(Serialize)] +struct AppConfigAuth { + mode: AuthenticatorKind, +} + +pub struct Context { + config: Config, + deployments: Store, +} + +impl Context { + pub async fn new(config: Config, client: Client) -> Self { + Self { + config, + deployments: make_reflector(client).await, + } + } + + async fn sync_deployment_status( + &self, + client: &Client, + console: &Console, + ) -> Result<(), kube::Error> { + let namespace = console.namespace().unwrap(); + let console_api: Api = Api::namespaced(client.clone(), &namespace); + + let Some(deployment) = self + .deployments + .get(&ObjectRef::new(&console.deployment_name()).within(&namespace)) + else { + return Ok(()); + }; + + let Some(deployment_conditions) = &deployment + .status + .as_ref() + .and_then(|status| status.conditions.as_ref()) + else { + // if the deployment doesn't have any conditions set yet, there + // is nothing to sync + return Ok(()); + }; + + let ready = deployment_conditions + .iter() + .any(|condition| condition.type_ == "Available" && condition.status == "True"); + let ready_str = if ready { "True" } else { "False" }; + + let mut status = console.status.clone().unwrap(); + if status + .conditions + .iter() + .any(|condition| condition.type_ == "Ready" && condition.status == ready_str) + { + // if the deployment status is already set correctly, we don't + // need to set it again (this prevents us from getting stuck in + // a reconcile loop) + return Ok(()); + } + + status.conditions = vec![Condition { + type_: "Ready".to_string(), + status: ready_str.to_string(), + last_transition_time: Time(chrono::offset::Utc::now()), + message: format!( + "console deployment is{} ready", + if ready { "" } else { " not" } + ), + observed_generation: None, + reason: "DeploymentStatus".to_string(), + }]; + let mut new_console = console.clone(); + new_console.status = Some(status); + + console_api + .replace_status( + &console.name_unchecked(), + &PostParams::default(), + serde_json::to_vec(&new_console).unwrap(), + ) + .await?; + + Ok(()) + } + + fn create_network_policies(&self, console: &Console) -> Vec { + let mut network_policies = Vec::new(); + if self.config.network_policies_ingress_enabled { + let console_label_selector = LabelSelector { + match_labels: Some( + console + .default_labels() + .into_iter() + .chain([("materialize.cloud/app".to_owned(), console.app_name())]) + .collect(), + ), + ..Default::default() + }; + network_policies.extend([NetworkPolicy { + metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")), + spec: Some(NetworkPolicySpec { + ingress: Some(vec![NetworkPolicyIngressRule { + from: Some( + self.config + .network_policies_ingress_cidrs + .iter() + .map(|cidr| NetworkPolicyPeer { + ip_block: Some(IPBlock { + cidr: cidr.to_owned(), + except: None, + }), + ..Default::default() + }) + .collect(), + ), + ports: Some(vec![NetworkPolicyPort { + port: Some(IntOrString::Int(self.config.console_http_port.into())), + protocol: Some("TCP".to_string()), + ..Default::default() + }]), + ..Default::default() + }]), + pod_selector: Some(console_label_selector), + policy_types: Some(vec!["Ingress".to_owned()]), + ..Default::default() + }), + }]); + } + network_policies + } + + fn create_console_external_certificate(&self, console: &Console) -> Option { + create_certificate( + self.config + .default_certificate_specs + .console_external + .clone(), + console, + console.spec.external_certificate_spec.clone(), + console.external_certificate_name(), + console.external_certificate_secret_name(), + None, + CertificatePrivateKeyAlgorithm::Rsa, + Some(4096), + ) + } + + fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap { + let version: String = console + .spec + .console_image_ref + .rsplitn(2, ':') + .next() + .expect("at least one chunk, even if empty") + .to_owned(); + let app_config_json = serde_json::to_string(&AppConfig { + version, + auth: AppConfigAuth { + mode: console.spec.balancerd.authenticator_kind, + }, + }) + .expect("known valid"); + ConfigMap { + binary_data: None, + data: Some(btreemap! { + "app-config.json".to_owned() => app_config_json, + }), + immutable: None, + metadata: console.managed_resource_meta(console.configmap_name()), + } + } + + fn create_console_deployment_object(&self, console: &Console) -> Deployment { + let mut pod_template_labels = console.default_labels(); + pod_template_labels.insert( + "materialize.cloud/name".to_owned(), + console.deployment_name(), + ); + pod_template_labels.insert("app".to_owned(), "console".to_string()); + pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name()); + + let ports = vec![ContainerPort { + container_port: self.config.console_http_port.into(), + name: Some("http".into()), + protocol: Some("TCP".into()), + ..Default::default() + }]; + + let scheme = if issuer_ref_defined( + &self.config.default_certificate_specs.balancerd_external, + &console.spec.balancerd.external_certificate_spec, + ) { + "https" + } else { + "http" + }; + let mut env = vec![EnvVar { + name: "MZ_ENDPOINT".to_string(), + value: Some(format!( + "{}://{}.{}.svc.cluster.local:{}", + scheme, + console.spec.balancerd.service_name, + console.spec.balancerd.namespace, + self.config.balancerd_http_port, + )), + ..Default::default() + }]; + let mut volumes = vec![Volume { + name: "app-config".to_string(), + config_map: Some(ConfigMapVolumeSource { + name: console.configmap_name(), + default_mode: Some(256), + optional: Some(false), + items: Some(vec![KeyToPath { + key: "app-config.json".to_string(), + path: "app-config.json".to_string(), + ..Default::default() + }]), + }), + ..Default::default() + }]; + let mut volume_mounts = vec![VolumeMount { + name: "app-config".to_string(), + mount_path: "/usr/share/nginx/html/app-config".to_string(), + ..Default::default() + }]; + + let scheme = if issuer_ref_defined( + &self.config.default_certificate_specs.console_external, + &console.spec.external_certificate_spec, + ) { + volumes.push(Volume { + name: "external-certificate".to_owned(), + secret: Some(SecretVolumeSource { + default_mode: Some(0o400), + secret_name: Some(console.external_certificate_secret_name()), + items: None, + optional: Some(false), + }), + ..Default::default() + }); + volume_mounts.push(VolumeMount { + name: "external-certificate".to_owned(), + mount_path: "/nginx/tls".to_owned(), + read_only: Some(true), + ..Default::default() + }); + env.push(EnvVar { + name: "MZ_NGINX_LISTENER_CONFIG".to_string(), + value: Some(format!( + "listen {} ssl; +ssl_certificate /nginx/tls/tls.crt; +ssl_certificate_key /nginx/tls/tls.key;", + self.config.console_http_port + )), + ..Default::default() + }); + Some("HTTPS".to_owned()) + } else { + env.push(EnvVar { + name: "MZ_NGINX_LISTENER_CONFIG".to_string(), + value: Some(format!("listen {};", self.config.console_http_port)), + ..Default::default() + }); + Some("HTTP".to_owned()) + }; + + let probe = Probe { + http_get: Some(HTTPGetAction { + path: Some("/".to_string()), + port: IntOrString::Int(self.config.console_http_port.into()), + scheme, + ..Default::default() + }), + ..Default::default() + }; + + let security_context = if self.config.enable_security_context { + // Since we want to adhere to the most restrictive security context, all + // of these fields have to be set how they are. + // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted + Some(SecurityContext { + run_as_non_root: Some(true), + capabilities: Some(Capabilities { + drop: Some(vec!["ALL".to_string()]), + ..Default::default() + }), + seccomp_profile: Some(SeccompProfile { + type_: "RuntimeDefault".to_string(), + ..Default::default() + }), + allow_privilege_escalation: Some(false), + ..Default::default() + }) + } else { + None + }; + + let container = Container { + name: "console".to_owned(), + image: Some(console.spec.console_image_ref.clone()), + image_pull_policy: Some(self.config.image_pull_policy.to_string()), + ports: Some(ports), + env: Some(env), + startup_probe: Some(Probe { + period_seconds: Some(1), + failure_threshold: Some(10), + ..probe.clone() + }), + readiness_probe: Some(Probe { + period_seconds: Some(30), + failure_threshold: Some(1), + ..probe.clone() + }), + liveness_probe: Some(Probe { + period_seconds: Some(30), + ..probe.clone() + }), + resources: console + .spec + .resource_requirements + .clone() + .or_else(|| self.config.console_default_resources.clone()), + security_context, + volume_mounts: Some(volume_mounts), + ..Default::default() + }; + + let deployment_spec = DeploymentSpec { + replicas: Some(console.replicas()), + selector: LabelSelector { + match_labels: Some(pod_template_labels.clone()), + ..Default::default() + }, + template: PodTemplateSpec { + // not using managed_resource_meta because the pod should be + // owned by the deployment, not the materialize instance + metadata: Some(ObjectMeta { + labels: Some(pod_template_labels), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![container], + node_selector: Some( + self.config + .console_node_selector + .iter() + .map(|selector| (selector.key.clone(), selector.value.clone())) + .collect(), + ), + affinity: self.config.console_affinity.clone(), + tolerations: self.config.console_tolerations.clone(), + scheduler_name: self.config.scheduler_name.clone(), + volumes: Some(volumes), + security_context: Some(PodSecurityContext { + fs_group: Some(101), + ..Default::default() + }), + ..Default::default() + }), + }, + ..Default::default() + }; + + Deployment { + metadata: ObjectMeta { + ..console.managed_resource_meta(console.deployment_name()) + }, + spec: Some(deployment_spec), + status: None, + } + } + + fn create_console_service_object(&self, console: &Console) -> Service { + let selector = + btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()}; + + let ports = vec![ServicePort { + name: Some("http".to_string()), + protocol: Some("TCP".to_string()), + port: self.config.console_http_port.into(), + target_port: Some(IntOrString::Int(self.config.console_http_port.into())), + ..Default::default() + }]; + + let spec = ServiceSpec { + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + selector: Some(selector), + ports: Some(ports), + ..Default::default() + }; + + Service { + metadata: console.managed_resource_meta(console.service_name()), + spec: Some(spec), + status: None, + } + } +} + +#[async_trait::async_trait] +impl k8s_controller::Context for Context { + type Resource = Console; + type Error = Error; + + #[instrument(fields())] + async fn apply( + &self, + client: Client, + console: &Self::Resource, + ) -> Result, Self::Error> { + if console.status.is_none() { + let console_api: Api = + Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap()); + let mut new_console = console.clone(); + new_console.status = Some(console.status()); + console_api + .replace_status( + &console.name_unchecked(), + &PostParams::default(), + serde_json::to_vec(&new_console).unwrap(), + ) + .await?; + // Updating the status should trigger a reconciliation + // which will include a status this time. + return Ok(None); + } + + let namespace = console.namespace().unwrap(); + let network_policy_api: Api = Api::namespaced(client.clone(), &namespace); + let configmap_api: Api = Api::namespaced(client.clone(), &namespace); + let deployment_api: Api = Api::namespaced(client.clone(), &namespace); + let service_api: Api = Api::namespaced(client.clone(), &namespace); + let certificate_api: Api = Api::namespaced(client.clone(), &namespace); + + trace!("creating new network policies"); + let network_policies = self.create_network_policies(console); + for network_policy in &network_policies { + apply_resource(&network_policy_api, network_policy).await?; + } + + trace!("creating new console configmap"); + let console_configmap = self.create_console_app_configmap_object(console); + apply_resource(&configmap_api, &console_configmap).await?; + + trace!("creating new console deployment"); + let console_deployment = self.create_console_deployment_object(console); + apply_resource(&deployment_api, &console_deployment).await?; + + trace!("creating new console service"); + let console_service = self.create_console_service_object(console); + apply_resource(&service_api, &console_service).await?; + + let console_external_certificate = self.create_console_external_certificate(console); + if let Some(certificate) = &console_external_certificate { + trace!("creating new console external certificate"); + apply_resource(&certificate_api, certificate).await?; + } + + self.sync_deployment_status(&client, console).await?; + + Ok(None) + } +} diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 9afdd3f40d4ef..e37340ef345aa 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -39,6 +39,7 @@ use mz_cloud_provider::CloudProvider; use mz_cloud_resources::crd::{ ManagedResource, balancer::v1alpha1::{Balancer, BalancerSpec}, + console::v1alpha1::{BalancerdRef, Console, ConsoleSpec}, materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus}, }; use mz_license_keys::validate; @@ -46,7 +47,6 @@ use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; use mz_orchestrator_tracing::TracingCliArgs; use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument}; -pub mod console; pub mod environmentd; pub struct Config { @@ -85,10 +85,6 @@ pub struct Config { pub clusterd_node_selector: Vec>, pub clusterd_affinity: Option, pub clusterd_tolerations: Option>, - pub console_node_selector: Vec>, - pub console_affinity: Option, - pub console_tolerations: Option>, - pub console_default_resources: Option, pub image_pull_policy: KubernetesImagePullPolicy, pub network_policies_internal_enabled: bool, pub network_policies_ingress_enabled: bool, @@ -117,10 +113,6 @@ pub struct Config { pub environmentd_internal_http_port: u16, pub environmentd_internal_persist_pubsub_port: u16, - pub balancerd_http_port: u16, - - pub console_http_port: u16, - pub default_certificate_specs: DefaultCertificateSpecs, pub disable_license_key_checks: bool, @@ -281,6 +273,7 @@ impl k8s_controller::Context for Context { ) -> Result, Self::Error> { let mz_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let balancer_api: Api = Api::namespaced(client.clone(), &mz.namespace()); + let console_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let secret_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let status = mz.status(); @@ -695,7 +688,7 @@ impl k8s_controller::Context for Context { }, ), frontegg_routing: None, - resource_id: Some(status.resource_id), + resource_id: Some(status.resource_id.clone()), }, status: None, }; @@ -712,33 +705,50 @@ impl k8s_controller::Context for Context { // and the console relies on the balancer service existing, which is // enforced by wait_for_balancer - let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':') - else { - return Err(Error::Anyhow(anyhow::anyhow!( - "failed to parse environmentd image ref: {}", - mz.spec.environmentd_image_ref - ))); - }; - let console_image_tag = self - .config - .console_image_tag_map - .iter() - .find(|kv| kv.key == environmentd_image_tag) - .map(|kv| kv.value.clone()) - .unwrap_or_else(|| self.config.console_image_tag_default.clone()); - let console = console::Resources::new( - &self.config, - mz, - &matching_image_from_environmentd_image_ref( - &mz.spec.environmentd_image_ref, - "console", - Some(&console_image_tag), - ), - ); if self.config.create_console { - console.apply(&client, &mz.namespace()).await?; + let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':') + else { + return Err(Error::Anyhow(anyhow::anyhow!( + "failed to parse environmentd image ref: {}", + mz.spec.environmentd_image_ref + ))); + }; + let console_image_tag = self + .config + .console_image_tag_map + .iter() + .find(|kv| kv.key == environmentd_image_tag) + .map(|kv| kv.value.clone()) + .unwrap_or_else(|| self.config.console_image_tag_default.clone()); + let console = Console { + metadata: mz.managed_resource_meta(mz.name_unchecked()), + spec: ConsoleSpec { + console_image_ref: matching_image_from_environmentd_image_ref( + &mz.spec.environmentd_image_ref, + "console", + Some(&console_image_tag), + ), + resource_requirements: mz.spec.console_resource_requirements.clone(), + replicas: Some(mz.console_replicas()), + external_certificate_spec: mz.spec.console_external_certificate_spec.clone(), + pod_annotations: mz.spec.pod_annotations.clone(), + pod_labels: mz.spec.pod_labels.clone(), + balancerd: BalancerdRef { + service_name: mz.balancerd_service_name(), + namespace: mz.namespace(), + external_certificate_spec: mz + .spec + .balancerd_external_certificate_spec + .clone(), + authenticator_kind: mz.spec.authenticator_kind, + }, + resource_id: Some(status.resource_id), + }, + status: None, + }; + apply_resource(&console_api, &console).await?; } else { - console.cleanup(&client, &mz.namespace()).await?; + delete_resource(&console_api, &mz.name_prefixed("console")).await?; } Ok(result) diff --git a/src/orchestratord/src/controller/materialize/console.rs b/src/orchestratord/src/controller/materialize/console.rs deleted file mode 100644 index 18c1a2631c42f..0000000000000 --- a/src/orchestratord/src/controller/materialize/console.rs +++ /dev/null @@ -1,464 +0,0 @@ -// Copyright Materialize, Inc. and contributors. All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use k8s_openapi::{ - api::{ - apps::v1::{Deployment, DeploymentSpec}, - core::v1::{ - Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar, - HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, - SeccompProfile, SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, - Volume, VolumeMount, - }, - networking::v1::{ - IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort, - NetworkPolicySpec, - }, - }, - apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, -}; -use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action}; -use maplit::btreemap; -use mz_server_core::listeners::AuthenticatorKind; -use serde::Serialize; -use tracing::trace; - -use crate::{ - k8s::{apply_resource, delete_resource}, - tls::{create_certificate, issuer_ref_defined}, -}; -use mz_cloud_resources::crd::{ - ManagedResource, - generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, - materialize::v1alpha1::Materialize, -}; - -pub struct Resources { - network_policies: Vec, - console_configmap: Box, - console_deployment: Box, - console_service: Box, - console_external_certificate: Box>, -} - -impl Resources { - pub fn new(config: &super::Config, mz: &Materialize, console_image_ref: &str) -> Self { - let network_policies = create_network_policies(config, mz); - let console_configmap = - Box::new(create_console_app_configmap_object(mz, console_image_ref)); - let console_deployment = Box::new(create_console_deployment_object( - config, - mz, - console_image_ref, - )); - let console_service = Box::new(create_console_service_object(config, mz)); - let console_external_certificate = - Box::new(create_console_external_certificate(config, mz)); - Self { - network_policies, - console_configmap, - console_deployment, - console_service, - console_external_certificate, - } - } - - pub async fn apply( - &self, - client: &Client, - namespace: &str, - ) -> Result, anyhow::Error> { - let network_policy_api: Api = Api::namespaced(client.clone(), namespace); - let configmap_api: Api = Api::namespaced(client.clone(), namespace); - let deployment_api: Api = Api::namespaced(client.clone(), namespace); - let service_api: Api = Api::namespaced(client.clone(), namespace); - let certificate_api: Api = Api::namespaced(client.clone(), namespace); - - trace!("creating new network policies"); - for network_policy in &self.network_policies { - apply_resource(&network_policy_api, network_policy).await?; - } - - trace!("creating new console configmap"); - apply_resource(&configmap_api, &self.console_configmap).await?; - - trace!("creating new console deployment"); - apply_resource(&deployment_api, &self.console_deployment).await?; - - trace!("creating new console service"); - apply_resource(&service_api, &self.console_service).await?; - - if let Some(certificate) = &*self.console_external_certificate { - trace!("creating new console external certificate"); - apply_resource(&certificate_api, certificate).await?; - } - - Ok(None) - } - - pub async fn cleanup( - &self, - client: &Client, - namespace: &str, - ) -> Result, anyhow::Error> { - let network_policy_api: Api = Api::namespaced(client.clone(), namespace); - let configmap_api: Api = Api::namespaced(client.clone(), namespace); - let deployment_api: Api = Api::namespaced(client.clone(), namespace); - let service_api: Api = Api::namespaced(client.clone(), namespace); - let certificate_api: Api = Api::namespaced(client.clone(), namespace); - - if let Some(certificate) = &*self.console_external_certificate { - trace!("deleting console external certificate"); - delete_resource(&certificate_api, &certificate.name_unchecked()).await?; - } - - trace!("deleting console service"); - delete_resource(&service_api, &self.console_service.name_unchecked()).await?; - - trace!("deleting console deployment"); - delete_resource(&deployment_api, &self.console_deployment.name_unchecked()).await?; - - trace!("deleting console configmap"); - delete_resource(&configmap_api, &self.console_configmap.name_unchecked()).await?; - - trace!("deleting network policies"); - for network_policy in &self.network_policies { - delete_resource(&network_policy_api, &network_policy.name_unchecked()).await?; - } - - Ok(None) - } -} - -fn create_network_policies(config: &super::Config, mz: &Materialize) -> Vec { - let mut network_policies = Vec::new(); - if config.network_policies_ingress_enabled { - let console_label_selector = LabelSelector { - match_labels: Some( - mz.default_labels() - .into_iter() - .chain([("materialize.cloud/app".to_owned(), mz.console_app_name())]) - .collect(), - ), - ..Default::default() - }; - network_policies.extend([NetworkPolicy { - metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")), - spec: Some(NetworkPolicySpec { - ingress: Some(vec![NetworkPolicyIngressRule { - from: Some( - config - .network_policies_ingress_cidrs - .iter() - .map(|cidr| NetworkPolicyPeer { - ip_block: Some(IPBlock { - cidr: cidr.to_owned(), - except: None, - }), - ..Default::default() - }) - .collect(), - ), - ports: Some(vec![NetworkPolicyPort { - port: Some(IntOrString::Int(config.console_http_port.into())), - protocol: Some("TCP".to_string()), - ..Default::default() - }]), - ..Default::default() - }]), - pod_selector: Some(console_label_selector), - policy_types: Some(vec!["Ingress".to_owned()]), - ..Default::default() - }), - }]); - } - network_policies -} - -fn create_console_external_certificate( - config: &super::Config, - mz: &Materialize, -) -> Option { - create_certificate( - config.default_certificate_specs.console_external.clone(), - mz, - mz.spec.console_external_certificate_spec.clone(), - mz.console_external_certificate_name(), - mz.console_external_certificate_secret_name(), - None, - CertificatePrivateKeyAlgorithm::Rsa, - Some(4096), - ) -} - -#[derive(Serialize)] -struct ConsoleAppConfig { - version: String, - auth: ConsoleAppConfigAuth, -} - -#[derive(Serialize)] -struct ConsoleAppConfigAuth { - mode: AuthenticatorKind, -} - -fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str) -> ConfigMap { - let version: String = console_image_ref - .rsplitn(2, ':') - .next() - .expect("at least one chunk, even if empty") - .to_owned(); - let app_config_json = serde_json::to_string(&ConsoleAppConfig { - version, - auth: ConsoleAppConfigAuth { - mode: mz.spec.authenticator_kind, - }, - }) - .expect("known valid"); - ConfigMap { - binary_data: None, - data: Some(btreemap! { - "app-config.json".to_owned() => app_config_json, - }), - immutable: None, - metadata: mz.managed_resource_meta(mz.console_configmap_name()), - } -} - -fn create_console_deployment_object( - config: &super::Config, - mz: &Materialize, - console_image_ref: &str, -) -> Deployment { - let mut pod_template_labels = mz.default_labels(); - pod_template_labels.insert( - "materialize.cloud/name".to_owned(), - mz.console_deployment_name(), - ); - pod_template_labels.insert("app".to_owned(), "console".to_string()); - pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name()); - - let ports = vec![ContainerPort { - container_port: config.console_http_port.into(), - name: Some("http".into()), - protocol: Some("TCP".into()), - ..Default::default() - }]; - - let scheme = if issuer_ref_defined( - &config.default_certificate_specs.balancerd_external, - &mz.spec.balancerd_external_certificate_spec, - ) { - "https" - } else { - "http" - }; - let mut env = vec![EnvVar { - name: "MZ_ENDPOINT".to_string(), - value: Some(format!( - "{}://{}.{}.svc.cluster.local:{}", - scheme, - mz.balancerd_service_name(), - mz.namespace(), - config.balancerd_http_port, - )), - ..Default::default() - }]; - let mut volumes = vec![Volume { - name: "app-config".to_string(), - config_map: Some(ConfigMapVolumeSource { - name: mz.console_configmap_name(), - default_mode: Some(256), - optional: Some(false), - items: Some(vec![KeyToPath { - key: "app-config.json".to_string(), - path: "app-config.json".to_string(), - ..Default::default() - }]), - }), - ..Default::default() - }]; - let mut volume_mounts = vec![VolumeMount { - name: "app-config".to_string(), - mount_path: "/usr/share/nginx/html/app-config".to_string(), - ..Default::default() - }]; - - let scheme = if issuer_ref_defined( - &config.default_certificate_specs.console_external, - &mz.spec.console_external_certificate_spec, - ) { - volumes.push(Volume { - name: "external-certificate".to_owned(), - secret: Some(SecretVolumeSource { - default_mode: Some(0o400), - secret_name: Some(mz.console_external_certificate_secret_name()), - items: None, - optional: Some(false), - }), - ..Default::default() - }); - volume_mounts.push(VolumeMount { - name: "external-certificate".to_owned(), - mount_path: "/nginx/tls".to_owned(), - read_only: Some(true), - ..Default::default() - }); - env.push(EnvVar { - name: "MZ_NGINX_LISTENER_CONFIG".to_string(), - value: Some(format!( - "listen {} ssl; -ssl_certificate /nginx/tls/tls.crt; -ssl_certificate_key /nginx/tls/tls.key;", - config.console_http_port - )), - ..Default::default() - }); - Some("HTTPS".to_owned()) - } else { - env.push(EnvVar { - name: "MZ_NGINX_LISTENER_CONFIG".to_string(), - value: Some(format!("listen {};", config.console_http_port)), - ..Default::default() - }); - Some("HTTP".to_owned()) - }; - - let probe = Probe { - http_get: Some(HTTPGetAction { - path: Some("/".to_string()), - port: IntOrString::Int(config.console_http_port.into()), - scheme, - ..Default::default() - }), - ..Default::default() - }; - - let security_context = if config.enable_security_context { - // Since we want to adhere to the most restrictive security context, all - // of these fields have to be set how they are. - // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted - Some(SecurityContext { - run_as_non_root: Some(true), - capabilities: Some(Capabilities { - drop: Some(vec!["ALL".to_string()]), - ..Default::default() - }), - seccomp_profile: Some(SeccompProfile { - type_: "RuntimeDefault".to_string(), - ..Default::default() - }), - allow_privilege_escalation: Some(false), - ..Default::default() - }) - } else { - None - }; - - let container = Container { - name: "console".to_owned(), - image: Some(console_image_ref.to_string()), - image_pull_policy: Some(config.image_pull_policy.to_string()), - ports: Some(ports), - env: Some(env), - startup_probe: Some(Probe { - period_seconds: Some(1), - failure_threshold: Some(10), - ..probe.clone() - }), - readiness_probe: Some(Probe { - period_seconds: Some(30), - failure_threshold: Some(1), - ..probe.clone() - }), - liveness_probe: Some(Probe { - period_seconds: Some(30), - ..probe.clone() - }), - resources: mz - .spec - .console_resource_requirements - .clone() - .or_else(|| config.console_default_resources.clone()), - security_context, - volume_mounts: Some(volume_mounts), - ..Default::default() - }; - - let deployment_spec = DeploymentSpec { - replicas: Some(mz.console_replicas()), - selector: LabelSelector { - match_labels: Some(pod_template_labels.clone()), - ..Default::default() - }, - template: PodTemplateSpec { - // not using managed_resource_meta because the pod should be - // owned by the deployment, not the materialize instance - metadata: Some(ObjectMeta { - labels: Some(pod_template_labels), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![container], - node_selector: Some( - config - .console_node_selector - .iter() - .map(|selector| (selector.key.clone(), selector.value.clone())) - .collect(), - ), - affinity: config.console_affinity.clone(), - tolerations: config.console_tolerations.clone(), - scheduler_name: config.scheduler_name.clone(), - service_account_name: Some(mz.service_account_name()), - volumes: Some(volumes), - security_context: Some(PodSecurityContext { - fs_group: Some(101), - ..Default::default() - }), - ..Default::default() - }), - }, - ..Default::default() - }; - - Deployment { - metadata: ObjectMeta { - ..mz.managed_resource_meta(mz.console_deployment_name()) - }, - spec: Some(deployment_spec), - status: None, - } -} - -fn create_console_service_object(config: &super::Config, mz: &Materialize) -> Service { - let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()}; - - let ports = vec![ServicePort { - name: Some("http".to_string()), - protocol: Some("TCP".to_string()), - port: config.console_http_port.into(), - target_port: Some(IntOrString::Int(config.console_http_port.into())), - ..Default::default() - }]; - - let spec = ServiceSpec { - type_: Some("ClusterIP".to_string()), - cluster_ip: Some("None".to_string()), - selector: Some(selector), - ports: Some(ports), - ..Default::default() - }; - - Service { - metadata: mz.managed_resource_meta(mz.console_service_name()), - spec: Some(spec), - status: None, - } -} diff --git a/src/orchestratord/src/k8s.rs b/src/orchestratord/src/k8s.rs index ccc7bcac0f06c..2688fccb78284 100644 --- a/src/orchestratord/src/k8s.rs +++ b/src/orchestratord/src/k8s.rs @@ -98,6 +98,10 @@ pub async fn register_crds( crds: vec![crd::balancer::v1alpha1::Balancer::crd()], stored_version: String::from("v1alpha1"), }, + VersionedCrd { + crds: vec![crd::console::v1alpha1::Console::crd()], + stored_version: String::from("v1alpha1"), + }, VersionedCrd { crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()], stored_version: String::from("v1"),