Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 72 additions & 2 deletions src/cloud-resources/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,82 @@

//! Kubernetes custom resources

use std::collections::BTreeMap;
use std::time::Duration;

use futures::future::join_all;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::api::{Patch, PatchParams};
use kube::{Api, Client};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::{
Api, Client, Resource, ResourceExt,
api::{ObjectMeta, Patch, PatchParams},
core::crd::merge_crds,
runtime::{conditions, wait::await_condition},
};
use rand::{Rng, distr::Uniform};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};

use crate::crd::generated::cert_manager::certificates::{
CertificateIssuerRef, CertificateSecretTemplate,
};
use mz_ore::retry::Retry;

pub mod generated;
pub mod materialize;
#[cfg(feature = "vpc-endpoints")]
pub mod vpc_endpoint;

// This is intentionally a subset of the fields of a Certificate.
// We do not want customers to configure options that may conflict with
// things we override or expand in our code.
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct MaterializeCertSpec {
/// Additional DNS names the certificate will be valid for.
pub dns_names: Option<Vec<String>>,
/// Duration the certificate will be requested for.
/// Value must be in units accepted by Go
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
pub duration: Option<String>,
/// Duration before expiration the certificate will be renewed.
/// Value must be in units accepted by Go
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
pub renew_before: Option<String>,
/// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
pub issuer_ref: Option<CertificateIssuerRef>,
/// Additional annotations and labels to include in the Certificate object.
pub secret_template: Option<CertificateSecretTemplate>,
}

pub trait ManagedResource: Resource<DynamicType = ()> + Sized {
fn default_labels(&self) -> BTreeMap<String, String> {
BTreeMap::new()
}

fn managed_resource_meta(&self, name: String) -> ObjectMeta {
ObjectMeta {
namespace: Some(self.meta().namespace.clone().unwrap()),
name: Some(name),
labels: Some(self.default_labels()),
owner_references: Some(vec![owner_reference(self)]),
..Default::default()
}
}
}

fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
OwnerReference {
api_version: T::api_version(&()).to_string(),
kind: T::kind(&()).to_string(),
name: t.name_unchecked(),
uid: t.uid().unwrap(),
block_owner_deletion: Some(true),
..Default::default()
}
}

#[derive(Debug, Clone)]
pub struct VersionedCrd {
pub crds: Vec<CustomResourceDefinition>,
Expand Down Expand Up @@ -99,3 +156,16 @@ async fn register_custom_resource(
info!("Done registering {} crd", &crd_name);
Ok(())
}

pub fn new_resource_id() -> String {
// DNS-1035 names are supposed to be case insensitive,
// so we define our own character set, rather than use the
// built-in Alphanumeric distribution from rand, which
// includes both upper and lowercase letters.
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
rand::rng()
.sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
.take(10)
.map(|i| char::from(CHARSET[i]))
.collect()
}
102 changes: 24 additions & 78 deletions src/cloud-resources/src/crd/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,24 @@ use k8s_openapi::{
api::core::v1::{EnvVar, ResourceRequirements},
apimachinery::pkg::{
api::resource::Quantity,
apis::meta::v1::{Condition, OwnerReference, Time},
apis::meta::v1::{Condition, Time},
},
};
use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
use rand::Rng;
use rand::distr::Uniform;
use kube::{CustomResource, Resource, ResourceExt};
use schemars::JsonSchema;
use semver::Version;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
use mz_server_core::listeners::AuthenticatorKind;

use crate::crd::generated::cert_manager::certificates::{
CertificateIssuerRef, CertificateSecretTemplate,
};

pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
"materialize.cloud/last-known-active-generation";

pub mod v1alpha1 {

use super::*;

// This is intentionally a subset of the fields of a Certificate.
// We do not want customers to configure options that may conflict with
// things we override or expand in our code.
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct MaterializeCertSpec {
/// Additional DNS names the certificate will be valid for.
pub dns_names: Option<Vec<String>>,
/// Duration the certificate will be requested for.
/// Value must be in units accepted by Go
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
pub duration: Option<String>,
/// Duration before expiration the certificate will be renewed.
/// Value must be in units accepted by Go
/// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
pub renew_before: Option<String>,
/// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
pub issuer_ref: Option<CertificateIssuerRef>,
/// Additional annotations and labels to include in the Certificate object.
pub secret_template: Option<CertificateSecretTemplate>,
}
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
pub enum MaterializeRolloutStrategy {
/// Create a new generation of pods, leaving the old generation around until the
Expand Down Expand Up @@ -368,23 +341,6 @@ pub mod v1alpha1 {
})
}

pub fn default_labels(&self) -> BTreeMap<String, String> {
BTreeMap::from_iter([
(
"materialize.cloud/organization-name".to_owned(),
self.name_unchecked(),
),
(
"materialize.cloud/organization-namespace".to_owned(),
self.namespace(),
),
(
"materialize.cloud/mz-resource-id".to_owned(),
self.resource_id().to_owned(),
),
])
}

pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
format!(
"{}-{}-{}-0",
Expand Down Expand Up @@ -479,29 +435,11 @@ pub mod v1alpha1 {
}
}

pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
ObjectMeta {
namespace: Some(self.namespace()),
name: Some(name),
labels: Some(self.default_labels()),
owner_references: Some(vec![owner_reference(self)]),
..Default::default()
}
}

pub fn status(&self) -> MaterializeStatus {
self.status.clone().unwrap_or_else(|| {
let mut status = MaterializeStatus::default();
// DNS-1035 names are supposed to be case insensitive,
// so we define our own character set, rather than use the
// built-in Alphanumeric distribution from rand, which
// includes both upper and lowercase letters.
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
status.resource_id = rand::rng()
.sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
.take(10)
.map(|i| char::from(CHARSET[i]))
.collect();

status.resource_id = new_resource_id();

// If we're creating the initial status on an un-soft-deleted
// Environment we need to ensure that the last active generation
Expand Down Expand Up @@ -552,6 +490,25 @@ pub mod v1alpha1 {
a != b
}
}

impl ManagedResource for Materialize {
fn default_labels(&self) -> BTreeMap<String, String> {
BTreeMap::from_iter([
(
"materialize.cloud/organization-name".to_owned(),
self.name_unchecked(),
),
(
"materialize.cloud/organization-namespace".to_owned(),
self.namespace(),
),
(
"materialize.cloud/mz-resource-id".to_owned(),
self.resource_id().to_owned(),
),
])
}
}
}

fn parse_image_ref(image_ref: &str) -> Option<Version> {
Expand All @@ -568,17 +525,6 @@ fn parse_image_ref(image_ref: &str) -> Option<Version> {
})
}

fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
OwnerReference {
api_version: T::api_version(&()).to_string(),
kind: T::kind(&()).to_string(),
name: t.name_unchecked(),
uid: t.uid().unwrap(),
block_owner_deletion: Some(true),
..Default::default()
}
}

#[cfg(test)]
mod tests {
use kube::core::ObjectMeta;
Expand Down
2 changes: 1 addition & 1 deletion src/orchestratord/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ chrono = { version = "0.4.39", default-features = false }
clap = { version = "4.5.23", features = ["derive"] }
futures = "0.3.31"
http = "1.2.0"
k8s-controller = "0.6.1"
k8s-controller = "0.8.0"
k8s-openapi = { version = "0.26.0", features = ["v1_31"] }
kube = { version = "2.0.1", default-features = false, features = ["client", "runtime", "ws"] }
maplit = "1.0.2"
Expand Down
Loading