Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions misc/helm-charts/operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ rules:
resources:
- materializes
- materializes/status
- balancers
- balancers/status
- vpcendpoints
verbs:
- create
Expand Down
1 change: 1 addition & 0 deletions src/cloud-resources/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::crd::generated::cert_manager::certificates::{
};
use mz_ore::retry::Retry;

pub mod balancer;
pub mod generated;
pub mod materialize;
#[cfg(feature = "vpc-endpoints")]
Expand Down
159 changes: 159 additions & 0 deletions src/cloud-resources/src/crd/balancer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;

use anyhow::bail;
use k8s_openapi::{
api::core::v1::ResourceRequirements, apimachinery::pkg::apis::meta::v1::Condition,
};
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};

pub mod v1alpha1 {
use super::*;

#[derive(Clone, Debug)]
pub enum Routing<'a> {
Static(&'a StaticRoutingConfig),
Frontegg(&'a FronteggRoutingConfig),
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StaticRoutingConfig {
pub environmentd_namespace: String,
pub environmentd_service_name: String,
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct FronteggRoutingConfig {
// TODO
}

#[derive(
CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
)]
#[serde(rename_all = "camelCase")]
#[kube(
namespaced,
group = "materialize.cloud",
version = "v1alpha1",
kind = "Balancer",
singular = "balancer",
plural = "balancers",
status = "BalancerStatus",
printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.balancerdImageRef", "priority": 1}"#,
printcolumn = r#"{"name": "Ready", "type": "string", "description": "Whether the deployment is ready", "jsonPath": ".status.conditions[?(@.type==\"Ready\")].status", "priority": 1}"#
)]
pub struct BalancerSpec {
/// The balancerd image to run.
pub balancerd_image_ref: String,
// Resource requirements for the balancerd pod
pub resource_requirements: Option<ResourceRequirements>,
// Number of balancerd pods to create
pub replicas: Option<i32>,
// The configuration for generating an x509 certificate using cert-manager for balancerd
// to present to incoming connections.
// The dns_names and issuer_ref fields are required.
pub external_certificate_spec: Option<MaterializeCertSpec>,
// The configuration for generating an x509 certificate using cert-manager for balancerd
// to use to communicate with environmentd.
// The dns_names and issuer_ref fields are required.
pub internal_certificate_spec: Option<MaterializeCertSpec>,
// Annotations to apply to the pods
pub pod_annotations: Option<BTreeMap<String, String>>,
// Labels to apply to the pods
pub pod_labels: Option<BTreeMap<String, String>>,

// Configuration for statically routing traffic
pub static_routing: Option<StaticRoutingConfig>,
// Configuration for routing traffic via Frontegg
pub frontegg_routing: Option<FronteggRoutingConfig>,

// This can be set to override the randomly chosen resource id
pub resource_id: Option<String>,
}

impl Balancer {
pub fn name_prefixed(&self, suffix: &str) -> String {
format!("mz{}-{}", self.resource_id(), suffix)
}

pub fn resource_id(&self) -> &str {
&self.status.as_ref().unwrap().resource_id
}

pub fn deployment_name(&self) -> String {
self.name_prefixed("balancerd")
}

pub fn replicas(&self) -> i32 {
self.spec.replicas.unwrap_or(2)
}

pub fn app_name(&self) -> String {
"balancerd".to_owned()
}

pub fn service_name(&self) -> String {
self.name_prefixed("balancerd")
}

pub fn external_certificate_name(&self) -> String {
self.name_prefixed("balancerd-external")
}

pub fn external_certificate_secret_name(&self) -> String {
self.name_prefixed("balancerd-external-tls")
}

pub fn routing(&self) -> anyhow::Result<Routing<'_>> {
match (&self.spec.static_routing, &self.spec.frontegg_routing) {
(Some(config), None) => Ok(Routing::Static(config)),
(None, Some(config)) => Ok(Routing::Frontegg(config)),
(None, None) => bail!("no routing configuration present"),
_ => bail!("multiple routing configurations present"),
}
}

pub fn status(&self) -> BalancerStatus {
self.status.clone().unwrap_or_else(|| BalancerStatus {
resource_id: self
.spec
.resource_id
.clone()
.unwrap_or_else(new_resource_id),
conditions: vec![],
})
}
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct BalancerStatus {
/// Resource identifier used as a name prefix to avoid pod name collisions.
pub resource_id: String,

pub conditions: Vec<Condition>,
}

impl ManagedResource for Balancer {
fn default_labels(&self) -> BTreeMap<String, String> {
BTreeMap::from_iter([(
"materialize.cloud/mz-resource-id".to_owned(),
self.resource_id().to_owned(),
)])
}
}
}
68 changes: 58 additions & 10 deletions src/orchestratord/src/bin/orchestratord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ use std::{

use http::HeaderValue;
use k8s_openapi::{
api::core::v1::{Affinity, ResourceRequirements, Toleration},
api::{
apps::v1::Deployment,
core::v1::{Affinity, ResourceRequirements, Service, Toleration},
},
apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition,
};
use kube::runtime::watcher;
use kube::{Api, runtime::watcher};
use mz_cloud_provider::CloudProvider;
use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
use tracing::info;

use mz_build_info::{BuildInfo, build_info};
Expand Down Expand Up @@ -318,7 +322,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
aws_secrets_controller_tags: args.aws_secrets_controller_tags,
environmentd_availability_zones: args.environmentd_availability_zones,
ephemeral_volume_class: args.ephemeral_volume_class,
scheduler_name: args.scheduler_name,
scheduler_name: args.scheduler_name.clone(),
enable_security_context: args.enable_security_context,
enable_internal_statement_logging: args.enable_internal_statement_logging,
disable_statement_logging: args.disable_statement_logging,
Expand All @@ -330,10 +334,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
clusterd_node_selector: args.clusterd_node_selector,
clusterd_affinity: args.clusterd_affinity,
clusterd_tolerations: args.clusterd_tolerations,
balancerd_node_selector: args.balancerd_node_selector,
balancerd_affinity: args.balancerd_affinity,
balancerd_tolerations: args.balancerd_tolerations,
balancerd_default_resources: args.balancerd_default_resources,
console_node_selector: args.console_node_selector,
console_affinity: args.console_affinity,
console_tolerations: args.console_tolerations,
Expand Down Expand Up @@ -373,11 +373,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
environmentd_internal_http_port: args.environmentd_internal_http_port,
environmentd_internal_persist_pubsub_port: args
.environmentd_internal_persist_pubsub_port,
balancerd_sql_port: args.balancerd_sql_port,
balancerd_http_port: args.balancerd_http_port,
balancerd_internal_http_port: args.balancerd_internal_http_port,
console_http_port: args.console_http_port,
default_certificate_specs: args.default_certificate_specs,
default_certificate_specs: args.default_certificate_specs.clone(),
disable_license_key_checks: args.disable_license_key_checks,
tracing: args.tracing,
orchestratord_namespace: namespace,
Expand All @@ -391,6 +389,56 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
.run(),
);

mz_ore::task::spawn(
|| "balancer controller",
k8s_controller::Controller::namespaced_all(
client.clone(),
controller::balancer::Context::new(
controller::balancer::Config {
enable_security_context: args.enable_security_context,
enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
image_pull_policy: args.image_pull_policy,
scheduler_name: args.scheduler_name,
balancerd_node_selector: args.balancerd_node_selector,
balancerd_affinity: args.balancerd_affinity,
balancerd_tolerations: args.balancerd_tolerations,
balancerd_default_resources: args.balancerd_default_resources,
default_certificate_specs: args.default_certificate_specs,
environmentd_sql_port: args.environmentd_sql_port,
environmentd_http_port: args.environmentd_http_port,
balancerd_sql_port: args.balancerd_sql_port,
balancerd_http_port: args.balancerd_http_port,
balancerd_internal_http_port: args.balancerd_internal_http_port,
},
client.clone(),
)
.await,
watcher::Config::default().timeout(29),
)
.with_controller(|controller| {
controller
.owns(
Api::<Deployment>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
.owns(
Api::<Service>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
.owns(
Api::<Certificate>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
})
.run(),
);

info!("All tasks started successfully.");

future::pending().await
Expand Down
1 change: 1 addition & 0 deletions src/orchestratord/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod balancer;
pub mod materialize;
Loading