Skip to content

Commit 587bcb6

Browse files
Merge pull request #34337 from MaterializeInc/push-wtyxknkkzltv
a few orchestratord refactors in preparation for adding additional controllers
2 parents be8b438 + 6ecac28 commit 587bcb6

File tree

12 files changed

+612
-485
lines changed

12 files changed

+612
-485
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cloud-resources/src/crd.rs

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,82 @@
99

1010
//! Kubernetes custom resources
1111
12+
use std::collections::BTreeMap;
1213
use std::time::Duration;
1314

1415
use futures::future::join_all;
1516
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
16-
use kube::api::{Patch, PatchParams};
17-
use kube::{Api, Client};
17+
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
1818
use kube::{
19+
Api, Client, Resource, ResourceExt,
20+
api::{ObjectMeta, Patch, PatchParams},
1921
core::crd::merge_crds,
2022
runtime::{conditions, wait::await_condition},
2123
};
24+
use rand::{Rng, distr::Uniform};
25+
use schemars::JsonSchema;
26+
use serde::{Deserialize, Serialize};
2227
use tracing::{info, warn};
2328

29+
use crate::crd::generated::cert_manager::certificates::{
30+
CertificateIssuerRef, CertificateSecretTemplate,
31+
};
2432
use mz_ore::retry::Retry;
2533

2634
pub mod generated;
2735
pub mod materialize;
2836
#[cfg(feature = "vpc-endpoints")]
2937
pub mod vpc_endpoint;
3038

39+
// This is intentionally a subset of the fields of a Certificate.
40+
// We do not want customers to configure options that may conflict with
41+
// things we override or expand in our code.
42+
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
43+
#[serde(rename_all = "camelCase")]
44+
pub struct MaterializeCertSpec {
45+
/// Additional DNS names the certificate will be valid for.
46+
pub dns_names: Option<Vec<String>>,
47+
/// Duration the certificate will be requested for.
48+
/// Value must be in units accepted by Go
49+
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
50+
pub duration: Option<String>,
51+
/// Duration before expiration the certificate will be renewed.
52+
/// Value must be in units accepted by Go
53+
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
54+
pub renew_before: Option<String>,
55+
/// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
56+
pub issuer_ref: Option<CertificateIssuerRef>,
57+
/// Additional annotations and labels to include in the Certificate object.
58+
pub secret_template: Option<CertificateSecretTemplate>,
59+
}
60+
61+
pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
62+
fn default_labels(&self) -> BTreeMap<String, String> {
63+
BTreeMap::new()
64+
}
65+
66+
fn managed_resource_meta(&self, name: String) -> ObjectMeta {
67+
ObjectMeta {
68+
namespace: Some(self.meta().namespace.clone().unwrap()),
69+
name: Some(name),
70+
labels: Some(self.default_labels()),
71+
owner_references: Some(vec![owner_reference(self)]),
72+
..Default::default()
73+
}
74+
}
75+
}
76+
77+
fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
78+
OwnerReference {
79+
api_version: T::api_version(&()).to_string(),
80+
kind: T::kind(&()).to_string(),
81+
name: t.name_unchecked(),
82+
uid: t.uid().unwrap(),
83+
block_owner_deletion: Some(true),
84+
..Default::default()
85+
}
86+
}
87+
3188
#[derive(Debug, Clone)]
3289
pub struct VersionedCrd {
3390
pub crds: Vec<CustomResourceDefinition>,
@@ -99,3 +156,16 @@ async fn register_custom_resource(
99156
info!("Done registering {} crd", &crd_name);
100157
Ok(())
101158
}
159+
160+
pub fn new_resource_id() -> String {
161+
// DNS-1035 names are supposed to be case insensitive,
162+
// so we define our own character set, rather than use the
163+
// built-in Alphanumeric distribution from rand, which
164+
// includes both upper and lowercase letters.
165+
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
166+
rand::rng()
167+
.sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
168+
.take(10)
169+
.map(|i| char::from(CHARSET[i]))
170+
.collect()
171+
}

src/cloud-resources/src/crd/materialize.rs

Lines changed: 24 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -13,51 +13,24 @@ use k8s_openapi::{
1313
api::core::v1::{EnvVar, ResourceRequirements},
1414
apimachinery::pkg::{
1515
api::resource::Quantity,
16-
apis::meta::v1::{Condition, OwnerReference, Time},
16+
apis::meta::v1::{Condition, Time},
1717
},
1818
};
19-
use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20-
use rand::Rng;
21-
use rand::distr::Uniform;
19+
use kube::{CustomResource, Resource, ResourceExt};
2220
use schemars::JsonSchema;
2321
use semver::Version;
2422
use serde::{Deserialize, Serialize};
2523
use uuid::Uuid;
2624

25+
use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
2726
use mz_server_core::listeners::AuthenticatorKind;
2827

29-
use crate::crd::generated::cert_manager::certificates::{
30-
CertificateIssuerRef, CertificateSecretTemplate,
31-
};
32-
3328
pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
3429
"materialize.cloud/last-known-active-generation";
3530

3631
pub mod v1alpha1 {
37-
3832
use super::*;
3933

40-
// This is intentionally a subset of the fields of a Certificate.
41-
// We do not want customers to configure options that may conflict with
42-
// things we override or expand in our code.
43-
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44-
#[serde(rename_all = "camelCase")]
45-
pub struct MaterializeCertSpec {
46-
/// Additional DNS names the certificate will be valid for.
47-
pub dns_names: Option<Vec<String>>,
48-
/// Duration the certificate will be requested for.
49-
/// Value must be in units accepted by Go
50-
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
51-
pub duration: Option<String>,
52-
/// Duration before expiration the certificate will be renewed.
53-
/// Value must be in units accepted by Go
54-
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
55-
pub renew_before: Option<String>,
56-
/// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
57-
pub issuer_ref: Option<CertificateIssuerRef>,
58-
/// Additional annotations and labels to include in the Certificate object.
59-
pub secret_template: Option<CertificateSecretTemplate>,
60-
}
6134
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
6235
pub enum MaterializeRolloutStrategy {
6336
/// Create a new generation of pods, leaving the old generation around until the
@@ -368,23 +341,6 @@ pub mod v1alpha1 {
368341
})
369342
}
370343

371-
pub fn default_labels(&self) -> BTreeMap<String, String> {
372-
BTreeMap::from_iter([
373-
(
374-
"materialize.cloud/organization-name".to_owned(),
375-
self.name_unchecked(),
376-
),
377-
(
378-
"materialize.cloud/organization-namespace".to_owned(),
379-
self.namespace(),
380-
),
381-
(
382-
"materialize.cloud/mz-resource-id".to_owned(),
383-
self.resource_id().to_owned(),
384-
),
385-
])
386-
}
387-
388344
pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
389345
format!(
390346
"{}-{}-{}-0",
@@ -544,29 +500,11 @@ pub mod v1alpha1 {
544500
}
545501
}
546502

547-
pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
548-
ObjectMeta {
549-
namespace: Some(self.namespace()),
550-
name: Some(name),
551-
labels: Some(self.default_labels()),
552-
owner_references: Some(vec![owner_reference(self)]),
553-
..Default::default()
554-
}
555-
}
556-
557503
pub fn status(&self) -> MaterializeStatus {
558504
self.status.clone().unwrap_or_else(|| {
559505
let mut status = MaterializeStatus::default();
560-
// DNS-1035 names are supposed to be case insensitive,
561-
// so we define our own character set, rather than use the
562-
// built-in Alphanumeric distribution from rand, which
563-
// includes both upper and lowercase letters.
564-
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
565-
status.resource_id = rand::rng()
566-
.sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
567-
.take(10)
568-
.map(|i| char::from(CHARSET[i]))
569-
.collect();
506+
507+
status.resource_id = new_resource_id();
570508

571509
// If we're creating the initial status on an un-soft-deleted
572510
// Environment we need to ensure that the last active generation
@@ -626,6 +564,25 @@ pub mod v1alpha1 {
626564
a != b
627565
}
628566
}
567+
568+
impl ManagedResource for Materialize {
569+
fn default_labels(&self) -> BTreeMap<String, String> {
570+
BTreeMap::from_iter([
571+
(
572+
"materialize.cloud/organization-name".to_owned(),
573+
self.name_unchecked(),
574+
),
575+
(
576+
"materialize.cloud/organization-namespace".to_owned(),
577+
self.namespace(),
578+
),
579+
(
580+
"materialize.cloud/mz-resource-id".to_owned(),
581+
self.resource_id().to_owned(),
582+
),
583+
])
584+
}
585+
}
629586
}
630587

631588
fn parse_image_ref(image_ref: &str) -> Option<Version> {
@@ -642,17 +599,6 @@ fn parse_image_ref(image_ref: &str) -> Option<Version> {
642599
})
643600
}
644601

645-
fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
646-
OwnerReference {
647-
api_version: T::api_version(&()).to_string(),
648-
kind: T::kind(&()).to_string(),
649-
name: t.name_unchecked(),
650-
uid: t.uid().unwrap(),
651-
block_owner_deletion: Some(true),
652-
..Default::default()
653-
}
654-
}
655-
656602
#[cfg(test)]
657603
mod tests {
658604
use kube::core::ObjectMeta;

src/orchestratord/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ chrono = { version = "0.4.39", default-features = false }
1717
clap = { version = "4.5.23", features = ["derive"] }
1818
futures = "0.3.31"
1919
http = "1.2.0"
20-
k8s-controller = "0.6.1"
20+
k8s-controller = "0.8.0"
2121
k8s-openapi = { version = "0.26.0", features = ["v1_31"] }
2222
kube = { version = "2.0.1", default-features = false, features = ["client", "runtime", "ws"] }
2323
maplit = "1.0.2"

0 commit comments

Comments
 (0)