Skip to content

Commit 4a2fa0a

Browse files
add a balancer crd and operator
1 parent e81e9c1 commit 4a2fa0a

File tree

7 files changed

+732
-8
lines changed

7 files changed

+732
-8
lines changed

misc/helm-charts/operator/templates/clusterrole.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ rules:
8888
resources:
8989
- materializes
9090
- materializes/status
91+
- balancers
92+
- balancers/status
9193
- vpcendpoints
9294
verbs:
9395
- create

src/cloud-resources/src/crd.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::crd::generated::cert_manager::certificates::{
3131
};
3232
use mz_ore::retry::Retry;
3333

34+
pub mod balancer;
3435
pub mod generated;
3536
pub mod materialize;
3637
#[cfg(feature = "vpc-endpoints")]
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
use std::collections::BTreeMap;
11+
12+
use anyhow::bail;
13+
use k8s_openapi::{
14+
api::core::v1::ResourceRequirements, apimachinery::pkg::apis::meta::v1::Condition,
15+
};
16+
use kube::CustomResource;
17+
use schemars::JsonSchema;
18+
use serde::{Deserialize, Serialize};
19+
20+
use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
21+
22+
pub mod v1alpha1 {
23+
use super::*;
24+
25+
#[derive(Clone, Debug)]
26+
pub enum Routing<'a> {
27+
Static(&'a StaticRoutingConfig),
28+
Frontegg(&'a FronteggRoutingConfig),
29+
}
30+
31+
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)]
32+
#[serde(rename_all = "camelCase")]
33+
pub struct StaticRoutingConfig {
34+
pub environmentd_namespace: String,
35+
pub environmentd_service_name: String,
36+
}
37+
38+
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)]
39+
#[serde(rename_all = "camelCase")]
40+
pub struct FronteggRoutingConfig {
41+
// TODO
42+
}
43+
44+
#[derive(
45+
CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
46+
)]
47+
#[serde(rename_all = "camelCase")]
48+
#[kube(
49+
namespaced,
50+
group = "materialize.cloud",
51+
version = "v1alpha1",
52+
kind = "Balancer",
53+
singular = "balancer",
54+
plural = "balancers",
55+
status = "BalancerStatus",
56+
printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.balancerdImageRef", "priority": 1}"#,
57+
printcolumn = r#"{"name": "Ready", "type": "string", "description": "Whether the deployment is ready", "jsonPath": ".status.conditions[?(@.type==\"Ready\")].status", "priority": 1}"#
58+
)]
59+
pub struct BalancerSpec {
60+
/// The balancerd image to run.
61+
pub balancerd_image_ref: String,
62+
// Resource requirements for the balancerd pod
63+
pub resource_requirements: Option<ResourceRequirements>,
64+
// Number of balancerd pods to create
65+
pub replicas: Option<i32>,
66+
// The configuration for generating an x509 certificate using cert-manager for balancerd
67+
// to present to incoming connections.
68+
// The dns_names and issuer_ref fields are required.
69+
pub external_certificate_spec: Option<MaterializeCertSpec>,
70+
// The configuration for generating an x509 certificate using cert-manager for balancerd
71+
// to use to communicate with environmentd.
72+
// The dns_names and issuer_ref fields are required.
73+
pub internal_certificate_spec: Option<MaterializeCertSpec>,
74+
// Annotations to apply to the pods
75+
pub pod_annotations: Option<BTreeMap<String, String>>,
76+
// Labels to apply to the pods
77+
pub pod_labels: Option<BTreeMap<String, String>>,
78+
79+
// Configuration for statically routing traffic
80+
pub static_routing: Option<StaticRoutingConfig>,
81+
// Configuration for routing traffic via Frontegg
82+
pub frontegg_routing: Option<FronteggRoutingConfig>,
83+
84+
// This can be set to override the randomly chosen resource id
85+
pub resource_id: Option<String>,
86+
}
87+
88+
impl Balancer {
89+
pub fn name_prefixed(&self, suffix: &str) -> String {
90+
format!("mz{}-{}", self.resource_id(), suffix)
91+
}
92+
93+
pub fn resource_id(&self) -> &str {
94+
&self.status.as_ref().unwrap().resource_id
95+
}
96+
97+
pub fn deployment_name(&self) -> String {
98+
self.name_prefixed("balancerd")
99+
}
100+
101+
pub fn replicas(&self) -> i32 {
102+
self.spec.replicas.unwrap_or(2)
103+
}
104+
105+
pub fn app_name(&self) -> String {
106+
"balancerd".to_owned()
107+
}
108+
109+
pub fn service_name(&self) -> String {
110+
self.name_prefixed("balancerd")
111+
}
112+
113+
pub fn external_certificate_name(&self) -> String {
114+
self.name_prefixed("balancerd-external")
115+
}
116+
117+
pub fn external_certificate_secret_name(&self) -> String {
118+
self.name_prefixed("balancerd-external-tls")
119+
}
120+
121+
pub fn routing(&self) -> anyhow::Result<Routing<'_>> {
122+
match (&self.spec.static_routing, &self.spec.frontegg_routing) {
123+
(Some(config), None) => Ok(Routing::Static(config)),
124+
(None, Some(config)) => Ok(Routing::Frontegg(config)),
125+
(None, None) => bail!("no routing configuration present"),
126+
_ => bail!("multiple routing configurations present"),
127+
}
128+
}
129+
130+
pub fn status(&self) -> BalancerStatus {
131+
self.status.clone().unwrap_or_else(|| BalancerStatus {
132+
resource_id: self
133+
.spec
134+
.resource_id
135+
.clone()
136+
.unwrap_or_else(new_resource_id),
137+
conditions: vec![],
138+
})
139+
}
140+
}
141+
142+
#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
143+
#[serde(rename_all = "camelCase")]
144+
pub struct BalancerStatus {
145+
/// Resource identifier used as a name prefix to avoid pod name collisions.
146+
pub resource_id: String,
147+
148+
pub conditions: Vec<Condition>,
149+
}
150+
151+
impl ManagedResource for Balancer {
152+
fn default_labels(&self) -> BTreeMap<String, String> {
153+
BTreeMap::from_iter([(
154+
"materialize.cloud/mz-resource-id".to_owned(),
155+
self.resource_id().to_owned(),
156+
)])
157+
}
158+
}
159+
}

src/orchestratord/src/bin/orchestratord.rs

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ use std::{
1515

1616
use http::HeaderValue;
1717
use k8s_openapi::{
18-
api::core::v1::{Affinity, ResourceRequirements, Toleration},
18+
api::{
19+
apps::v1::Deployment,
20+
core::v1::{Affinity, ResourceRequirements, Service, Toleration},
21+
},
1922
apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition,
2023
};
21-
use kube::runtime::watcher;
24+
use kube::{Api, runtime::watcher};
2225
use mz_cloud_provider::CloudProvider;
26+
use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
2327
use tracing::info;
2428

2529
use mz_build_info::{BuildInfo, build_info};
@@ -318,7 +322,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
318322
aws_secrets_controller_tags: args.aws_secrets_controller_tags,
319323
environmentd_availability_zones: args.environmentd_availability_zones,
320324
ephemeral_volume_class: args.ephemeral_volume_class,
321-
scheduler_name: args.scheduler_name,
325+
scheduler_name: args.scheduler_name.clone(),
322326
enable_security_context: args.enable_security_context,
323327
enable_internal_statement_logging: args.enable_internal_statement_logging,
324328
disable_statement_logging: args.disable_statement_logging,
@@ -330,10 +334,10 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
330334
clusterd_node_selector: args.clusterd_node_selector,
331335
clusterd_affinity: args.clusterd_affinity,
332336
clusterd_tolerations: args.clusterd_tolerations,
333-
balancerd_node_selector: args.balancerd_node_selector,
334-
balancerd_affinity: args.balancerd_affinity,
335-
balancerd_tolerations: args.balancerd_tolerations,
336-
balancerd_default_resources: args.balancerd_default_resources,
337+
balancerd_node_selector: args.balancerd_node_selector.clone(),
338+
balancerd_affinity: args.balancerd_affinity.clone(),
339+
balancerd_tolerations: args.balancerd_tolerations.clone(),
340+
balancerd_default_resources: args.balancerd_default_resources.clone(),
337341
console_node_selector: args.console_node_selector,
338342
console_affinity: args.console_affinity,
339343
console_tolerations: args.console_tolerations,
@@ -377,7 +381,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
377381
balancerd_http_port: args.balancerd_http_port,
378382
balancerd_internal_http_port: args.balancerd_internal_http_port,
379383
console_http_port: args.console_http_port,
380-
default_certificate_specs: args.default_certificate_specs,
384+
default_certificate_specs: args.default_certificate_specs.clone(),
381385
disable_license_key_checks: args.disable_license_key_checks,
382386
tracing: args.tracing,
383387
orchestratord_namespace: namespace,
@@ -391,6 +395,56 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
391395
.run(),
392396
);
393397

398+
mz_ore::task::spawn(
399+
|| "balancer controller",
400+
k8s_controller::Controller::namespaced_all(
401+
client.clone(),
402+
controller::balancer::Context::new(
403+
controller::balancer::Config {
404+
enable_security_context: args.enable_security_context,
405+
enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
406+
image_pull_policy: args.image_pull_policy,
407+
scheduler_name: args.scheduler_name,
408+
balancerd_node_selector: args.balancerd_node_selector,
409+
balancerd_affinity: args.balancerd_affinity,
410+
balancerd_tolerations: args.balancerd_tolerations,
411+
balancerd_default_resources: args.balancerd_default_resources,
412+
default_certificate_specs: args.default_certificate_specs,
413+
environmentd_sql_port: args.environmentd_sql_port,
414+
environmentd_http_port: args.environmentd_http_port,
415+
balancerd_sql_port: args.balancerd_sql_port,
416+
balancerd_http_port: args.balancerd_http_port,
417+
balancerd_internal_http_port: args.balancerd_internal_http_port,
418+
},
419+
client.clone(),
420+
)
421+
.await,
422+
watcher::Config::default().timeout(29),
423+
)
424+
.with_controller(|controller| {
425+
controller
426+
.owns(
427+
Api::<Deployment>::all(client.clone()),
428+
watcher::Config::default()
429+
.labels("materialize.cloud/mz-resource-id")
430+
.timeout(29),
431+
)
432+
.owns(
433+
Api::<Service>::all(client.clone()),
434+
watcher::Config::default()
435+
.labels("materialize.cloud/mz-resource-id")
436+
.timeout(29),
437+
)
438+
.owns(
439+
Api::<Certificate>::all(client.clone()),
440+
watcher::Config::default()
441+
.labels("materialize.cloud/mz-resource-id")
442+
.timeout(29),
443+
)
444+
})
445+
.run(),
446+
);
447+
394448
info!("All tasks started successfully.");
395449

396450
future::pending().await

src/orchestratord/src/controller.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10+
pub mod balancer;
1011
pub mod materialize;

0 commit comments

Comments
 (0)