From 1c444590174c96d868b4eac7eba07af7050dff00 Mon Sep 17 00:00:00 2001 From: Techassi Date: Tue, 23 Sep 2025 14:13:54 +0200 Subject: [PATCH 01/15] feat: Add CRD maintainer --- Cargo.lock | 1 + crates/stackable-operator/Cargo.toml | 3 +- .../stackable-operator/src/crd/maintainer.rs | 177 +++++++++++++++ crates/stackable-operator/src/crd/mod.rs | 2 + crates/stackable-webhook/src/constants.rs | 21 -- crates/stackable-webhook/src/lib.rs | 12 +- crates/stackable-webhook/src/options.rs | 14 +- .../src/servers/conversion.rs | 202 ++---------------- 8 files changed, 224 insertions(+), 208 deletions(-) create mode 100644 crates/stackable-operator/src/crd/maintainer.rs delete mode 100644 crates/stackable-webhook/src/constants.rs diff --git a/Cargo.lock b/Cargo.lock index 31e9df73b..e5947b8d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2966,6 +2966,7 @@ dependencies = [ "tracing-appender", "tracing-subscriber", "url", + "x509-cert", ] [[package]] diff --git a/crates/stackable-operator/Cargo.toml b/crates/stackable-operator/Cargo.toml index ca3b68a50..3deef516f 100644 --- a/crates/stackable-operator/Cargo.toml +++ b/crates/stackable-operator/Cargo.toml @@ -12,7 +12,7 @@ full = ["certs", "telemetry", "versioned", "time", "webhook", "clap"] default = ["telemetry", "versioned", "clap"] clap = [] -certs = ["dep:stackable-certs"] +certs = ["dep:stackable-certs", "dep:x509-cert"] telemetry = ["dep:stackable-telemetry"] time = ["stackable-shared/time"] versioned = ["dep:stackable-versioned"] @@ -53,6 +53,7 @@ tracing.workspace = true tracing-appender.workspace = true tracing-subscriber.workspace = true url.workspace = true +x509-cert = { workspace = true, optional = true } [dev-dependencies] rstest.workspace = true diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs new file mode 100644 index 000000000..9a1aa3d63 --- /dev/null +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -0,0 +1,177 @@ +use k8s_openapi::{ + ByteString, + apiextensions_apiserver::pkg::apis::apiextensions::v1::{ + CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, + WebhookConversion, + }, +}; +use kube::{ + Api, Client, ResourceExt, + api::{Patch, PatchParams}, +}; +use snafu::{ResultExt, Snafu}; +use tokio::sync::{mpsc, oneshot}; +use x509_cert::{ + Certificate, + der::{EncodePem, pem::LineEnding}, +}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("failed to encode CA certificate as PEM format"))] + EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, + + #[snafu(display("failed to send initial CRD reconcile heartbeat"))] + SendInitialReconcileHeartbeat, + + #[snafu(display("failed to patch CRD {crd_name:?}"))] + PatchCrd { + source: kube::Error, + crd_name: String, + }, +} + +/// Maintains various custom resource definitions. +/// +/// When running this, the following operations are done: +/// +/// - Apply the CRDs when starting up +/// - Reconcile the CRDs when the conversion webhook certificate is rotated +pub struct CustomResourceDefinitionMaintainer { + client: Client, + certificate_rx: mpsc::Receiver, + + definitions: Vec, + options: CustomResourceDefinitionMaintainerOptions, + + initial_reconcile_tx: Option>, +} + +impl CustomResourceDefinitionMaintainer { + /// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more + /// custom resource definitions. + pub fn new( + client: Client, + certificate_rx: mpsc::Receiver, + definitions: impl IntoIterator, + options: CustomResourceDefinitionMaintainerOptions, + ) -> (Self, oneshot::Receiver<()>) { + let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); + let initial_reconcile_tx = Some(initial_reconcile_tx); + + let maintainer = Self { + definitions: definitions.into_iter().collect(), + initial_reconcile_tx, + certificate_rx, + options, + client, + }; + + (maintainer, initial_reconcile_rx) + } + + pub async fn run(mut self) -> Result<(), Error> { + let CustomResourceDefinitionMaintainerOptions { + operator_service_name, + operator_namespace, + field_manager, + https_port, + disabled, + } = self.options; + + // If the maintainer is disabled, immediately return without doing any work. + if disabled { + return Ok(()); + } + + while let Some(certificate) = self.certificate_rx.recv().await { + tracing::info!( + k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::>(), + "reconciling custom resource definitions" + ); + + let ca_bundle = certificate + .to_pem(LineEnding::LF) + .context(EncodeCertificateAuthorityAsPemSnafu)?; + + let crd_api: Api = Api::all(self.client.clone()); + + for mut crd in self.definitions.iter().cloned() { + let crd_kind = &crd.spec.names.kind; + let crd_name = crd.name_any(); + + tracing::debug!( + k8s.crd.kind = crd_kind, + k8s.crd.name = crd_name, + "reconciling custom resource definition" + ); + + crd.spec.conversion = Some(CustomResourceConversion { + strategy: "Webhook".to_owned(), + webhook: Some(WebhookConversion { + // conversionReviewVersions indicates what ConversionReview versions are understood/preferred by the webhook. + // The first version in the list understood by the API server is sent to the webhook. + // The webhook must respond with a ConversionReview object in the same version it received. + conversion_review_versions: vec!["v1".to_owned()], + client_config: Some(WebhookClientConfig { + service: Some(ServiceReference { + name: operator_service_name.clone(), + namespace: operator_namespace.clone(), + path: Some(format!("/convert/{crd_name}")), + port: Some(https_port.into()), + }), + ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), + url: None, + }), + }), + }); + + let patch = Patch::Apply(&crd); + let patch_params = PatchParams::apply(&field_manager); + crd_api + .patch(&crd_name, &patch_params, &patch) + .await + .with_context(|_| PatchCrdSnafu { crd_name })?; + } + + // Once all CRDs are reconciled, send a heartbeat for consumers to be notified that + // custom resources of these kinds can bow be deployed. + if let Some(initial_reconcile_tx) = self.initial_reconcile_tx.take() { + initial_reconcile_tx + .send(()) + .ignore_context(SendInitialReconcileHeartbeatSnafu)? + } + } + + Ok(()) + } +} + +// TODO (@Techassi): Make this a builder instead +pub struct CustomResourceDefinitionMaintainerOptions { + operator_service_name: String, + operator_namespace: String, + field_manager: String, + https_port: u16, + disabled: bool, +} + +trait ResultContextExt { + fn ignore_context(self, context: C) -> Result + where + C: snafu::IntoError, + E2: std::error::Error + snafu::ErrorCompat; +} + +impl ResultContextExt for Result { + fn ignore_context(self, context: C) -> Result + where + C: snafu::IntoError, + E2: std::error::Error + snafu::ErrorCompat, + { + match self { + Ok(v) => Ok(v), + Err(_) => Err(context.into_error(snafu::NoneError)), + } + } +} diff --git a/crates/stackable-operator/src/crd/mod.rs b/crates/stackable-operator/src/crd/mod.rs index 3beb69aa8..791233267 100644 --- a/crates/stackable-operator/src/crd/mod.rs +++ b/crates/stackable-operator/src/crd/mod.rs @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize}; pub mod authentication; pub mod git_sync; pub mod listener; +#[cfg(feature = "certs")] +pub mod maintainer; pub mod s3; /// A reference to a product cluster (for example, a `ZookeeperCluster`) diff --git a/crates/stackable-webhook/src/constants.rs b/crates/stackable-webhook/src/constants.rs deleted file mode 100644 index bd5a49c27..000000000 --- a/crates/stackable-webhook/src/constants.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! Contains various constant definitions, mostly for default ports and IP -//! addresses. -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - -/// The default HTTPS port `8443` -pub const DEFAULT_HTTPS_PORT: u16 = 8443; - -// The HTTPS port the conversion webhook runs at -pub const CONVERSION_WEBHOOK_HTTPS_PORT: u16 = DEFAULT_HTTPS_PORT; - -/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, -/// which represents binding on all network addresses. -// -// TODO: We might want to switch to `Ipv6Addr::UNSPECIFIED)` here, as this *normally* binds to IPv4 -// and IPv6. However, it's complicated and depends on the underlying system... -// If we do so, we should set `set_only_v6(false)` on the socket to not rely on system defaults. -pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - -/// The default socket address `0.0.0.0:8443` the webhook server binds to. -pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = - SocketAddr::new(DEFAULT_LISTEN_ADDRESS, DEFAULT_HTTPS_PORT); diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index c98515e1e..37ecbbd6f 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -26,6 +26,8 @@ //! enable complete control over these details if needed. //! //! [1]: crate::servers::ConversionWebhookServer +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use axum::{Router, routing::get}; use futures_util::{FutureExt as _, pin_mut, select}; use snafu::{ResultExt, Snafu}; @@ -40,7 +42,6 @@ use x509_cert::Certificate; // use tower_http::trace::TraceLayer; use crate::tls::TlsServer; -pub mod constants; pub mod options; pub mod servers; pub mod tls; @@ -86,6 +87,15 @@ pub struct WebhookServer { } impl WebhookServer { + /// The default HTTPS port `8443` + pub const DEFAULT_HTTPS_PORT: u16 = 8443; + /// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, + /// which represents binding on all network addresses. + pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + /// The default socket address `0.0.0.0:8443` the webhook server binds to. + pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = + SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); + /// Creates a new ready-to-use webhook server. /// /// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles diff --git a/crates/stackable-webhook/src/options.rs b/crates/stackable-webhook/src/options.rs index 90623b093..7685c6163 100644 --- a/crates/stackable-webhook/src/options.rs +++ b/crates/stackable-webhook/src/options.rs @@ -6,7 +6,7 @@ use std::{ use stackable_certs::PrivateKeyType; -use crate::constants::DEFAULT_SOCKET_ADDRESS; +use crate::WebhookServer; /// Specifies available webhook server options. /// @@ -82,7 +82,9 @@ impl WebhookOptionsBuilder { /// Sets the IP address of the socket address the webhook server uses to /// bind for HTTPS. pub fn bind_ip(mut self, bind_ip: impl Into) -> Self { - let addr = self.socket_addr.get_or_insert(DEFAULT_SOCKET_ADDRESS); + let addr = self + .socket_addr + .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); addr.set_ip(bind_ip.into()); self } @@ -90,7 +92,9 @@ impl WebhookOptionsBuilder { /// Sets the port of the socket address the webhook server uses to bind /// for HTTPS. pub fn bind_port(mut self, bind_port: u16) -> Self { - let addr = self.socket_addr.get_or_insert(DEFAULT_SOCKET_ADDRESS); + let addr = self + .socket_addr + .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); addr.set_port(bind_port); self } @@ -119,7 +123,9 @@ impl WebhookOptionsBuilder { /// explicitly set option. pub fn build(self) -> WebhookOptions { WebhookOptions { - socket_addr: self.socket_addr.unwrap_or(DEFAULT_SOCKET_ADDRESS), + socket_addr: self + .socket_addr + .unwrap_or(WebhookServer::DEFAULT_SOCKET_ADDRESS), subject_alterative_dns_names: self.subject_alterative_dns_names, } } diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index ca83f6078..7c0c85fd3 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -1,34 +1,19 @@ use std::{fmt::Debug, net::SocketAddr}; use axum::{Json, Router, routing::post}; -use k8s_openapi::{ - ByteString, - apiextensions_apiserver::pkg::apis::apiextensions::v1::{ - CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, - WebhookConversion, - }, -}; +use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; +use kube::ResourceExt; // Re-export this type because users of the conversion webhook server require // this type to write the handler function. Instead of importing this type from // kube directly, consumers can use this type instead. This also eliminates // keeping the kube dependency version in sync between here and the operator. pub use kube::core::conversion::ConversionReview; -use kube::{ - Api, Client, ResourceExt, - api::{Patch, PatchParams}, -}; -use snafu::{OptionExt, ResultExt, Snafu}; -use tokio::{sync::mpsc, try_join}; +use snafu::{ResultExt, Snafu}; +use tokio::sync::mpsc; use tracing::instrument; -use x509_cert::{ - Certificate, - der::{EncodePem, pem::LineEnding}, -}; +use x509_cert::Certificate; -use crate::{ - WebhookError, WebhookHandler, WebhookServer, constants::CONVERSION_WEBHOOK_HTTPS_PORT, - options::WebhookOptions, -}; +use crate::{WebhookError, WebhookHandler, WebhookServer, options::WebhookOptions}; #[derive(Debug, Snafu)] pub enum ConversionWebhookError { @@ -77,31 +62,20 @@ pub struct ConversionWebhookOptions { /// The name of the Kubernetes service which points to the operator/webhook. pub service_name: String, - - /// If the CRDs should be maintained automatically. Use the (negated) value from - /// `stackable_operator::cli::ProductOperatorRun::disable_crd_maintenance` - /// for this. - // # Because of https://github.com/rust-lang/cargo/issues/3475 we can not use a real link here - pub maintain_crds: bool, - - /// The field manager used to apply Kubernetes objects, typically the operator name, e.g. - /// `airflow-operator`. - pub field_manager: String, } /// A ready-to-use CRD conversion webhook server. /// /// See [`ConversionWebhookServer::new()`] for usage examples. -pub struct ConversionWebhookServer { - crds: Vec, - options: ConversionWebhookOptions, - router: Router, - client: Client, -} +pub struct ConversionWebhookServer(WebhookServer); impl ConversionWebhookServer { + /// The default socket address the conversion webhook server binds to, see + /// [`WebhookServer::DEFAULT_SOCKET_ADDRESS`]. + pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = WebhookServer::DEFAULT_SOCKET_ADDRESS; + /// Creates a new conversion webhook server, which expects POST requests being made to the - /// `/convert/{crd name}` endpoint. + /// `/convert/{CRD_NAME}` endpoint. /// /// You need to provide a few things for every CRD passed in via the `crds_and_handlers` argument: /// @@ -172,15 +146,11 @@ impl ConversionWebhookServer { /// conversion_webhook.run().await.expect("failed to run ConversionWebhookServer"); /// # } /// ``` - #[instrument( - name = "create_conversion_webhook_server", - skip(crds_and_handlers, client) - )] + #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))] pub async fn new( crds_and_handlers: impl IntoIterator, options: ConversionWebhookOptions, - client: Client, - ) -> Result + ) -> Result<(Self, mpsc::Receiver), ConversionWebhookError> where H: WebhookHandler + Clone + Send + Sync + 'static, { @@ -200,34 +170,15 @@ impl ConversionWebhookServer { crds.push(crd); } - Ok(Self { - options, - router, - client, - crds, - }) - } - - pub async fn run(self) -> Result<(), ConversionWebhookError> { - tracing::info!("starting conversion webhook server"); - - let Self { - options, - router, - client, - crds, - } = self; - let ConversionWebhookOptions { socket_addr, namespace: operator_namespace, service_name: operator_service_name, - maintain_crds, - field_manager, } = &options; // This is how Kubernetes calls us, so it decides about the naming. // AFAIK we can not influence this, so this is the only SAN entry needed. + // FIXME (@Techassi): The cluster domain should be included here to form FQDN of the service let subject_alterative_dns_name = format!("{operator_service_name}.{operator_namespace}.svc",); @@ -236,127 +187,16 @@ impl ConversionWebhookServer { socket_addr: *socket_addr, }; - let (server, mut cert_rx) = WebhookServer::new(router, webhook_options) + let (server, certificate_rx) = WebhookServer::new(router, webhook_options) .await .context(CreateWebhookServerSnafu)?; - // We block the ConversionWebhookServer creation until the certificates have been generated. - // This way we - // 1. Are able to apply the CRDs before we start the actual controllers relying on them - // 2. Avoid updating them shortly after as cert have been generated. Doing so would cause - // unnecessary "too old resource version" errors in the controllers as the CRD was updated. - let current_cert = cert_rx - .recv() - .await - .context(ReceiveCertificateFromChannelSnafu)?; - - if *maintain_crds { - Self::reconcile_crds( - &client, - field_manager, - &crds, - operator_namespace, - operator_service_name, - current_cert, - ) - .await - .context(ReconcileCrdsSnafu)?; - - try_join!( - Self::run_webhook_server(server), - Self::run_crd_reconciliation_loop( - cert_rx, - &client, - field_manager, - &crds, - operator_namespace, - operator_service_name, - ), - )?; - } else { - Self::run_webhook_server(server).await?; - }; - - Ok(()) - } - - async fn run_webhook_server(server: WebhookServer) -> Result<(), ConversionWebhookError> { - server.run().await.context(RunWebhookServerSnafu) - } - - async fn run_crd_reconciliation_loop( - mut cert_rx: mpsc::Receiver, - client: &Client, - field_manager: &str, - crds: &[CustomResourceDefinition], - operator_namespace: &str, - operator_service_name: &str, - ) -> Result<(), ConversionWebhookError> { - while let Some(current_cert) = cert_rx.recv().await { - Self::reconcile_crds( - client, - field_manager, - crds, - operator_namespace, - operator_service_name, - current_cert, - ) - .await - .context(ReconcileCrdsSnafu)?; - } - Ok(()) + Ok((Self(server), certificate_rx)) } - #[instrument(skip_all)] - async fn reconcile_crds( - client: &Client, - field_manager: &str, - crds: &[CustomResourceDefinition], - operator_namespace: &str, - operator_service_name: &str, - current_cert: Certificate, - ) -> Result<(), ConversionWebhookError> { - tracing::info!( - crds = ?crds.iter().map(CustomResourceDefinition::name_any).collect::>(), - "Reconciling CRDs" - ); - let ca_bundle = current_cert - .to_pem(LineEnding::LF) - .context(ConvertCaToPemSnafu)?; - - let crd_api: Api = Api::all(client.clone()); - for mut crd in crds.iter().cloned() { - let crd_name = crd.name_any(); - - crd.spec.conversion = Some(CustomResourceConversion { - strategy: "Webhook".to_string(), - webhook: Some(WebhookConversion { - // conversionReviewVersions indicates what ConversionReview versions are understood/preferred by the webhook. - // The first version in the list understood by the API server is sent to the webhook. - // The webhook must respond with a ConversionReview object in the same version it received. - conversion_review_versions: vec!["v1".to_string()], - client_config: Some(WebhookClientConfig { - service: Some(ServiceReference { - name: operator_service_name.to_owned(), - namespace: operator_namespace.to_owned(), - path: Some(format!("/convert/{crd_name}")), - port: Some(CONVERSION_WEBHOOK_HTTPS_PORT.into()), - }), - ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), - url: None, - }), - }), - }); - - let patch = Patch::Apply(&crd); - let patch_params = PatchParams::apply(field_manager); - crd_api - .patch(&crd_name, &patch_params, &patch) - .await - .with_context(|_| UpdateCrdSnafu { - crd_name: crd_name.to_string(), - })?; - } - Ok(()) + /// Runs the [`ConversionWebhookServer`] asynchronously. + pub async fn run(self) -> Result<(), ConversionWebhookError> { + tracing::info!("starting conversion webhook server"); + self.0.run().await.context(RunWebhookServerSnafu) } } From 41f86494621405cb8dd7fc02647afa431114e44e Mon Sep 17 00:00:00 2001 From: Techassi Date: Tue, 23 Sep 2025 17:11:49 +0200 Subject: [PATCH 02/15] docs: Add/improve various (doc) comments --- .../stackable-operator/src/crd/maintainer.rs | 98 +++++++++++++------ crates/stackable-webhook/src/options.rs | 3 +- .../src/servers/conversion.rs | 93 +++++++----------- 3 files changed, 107 insertions(+), 87 deletions(-) diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs index 9a1aa3d63..74d6c18b5 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -50,6 +50,33 @@ pub struct CustomResourceDefinitionMaintainer { impl CustomResourceDefinitionMaintainer { /// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more /// custom resource definitions. + /// + /// ## Parameters + /// + /// This function expects four parameters: + /// + /// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches + /// the CRDs when the TLS certificate is rotated. + /// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The + /// certificate data sent through the channel is used to set the caBundle in the conversion + /// section of the CRD. + /// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained + /// by this maintainer. If the iterator is empty, the maintainer returns early without doing + /// any work. As such, a polling mechanism which waits for all futures should be used to + /// prevent premature termination of the operator. + /// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various + /// parts of the maintainer. In the future, this will be converted to a builder, to enable a + /// cleaner API interface. + /// + /// ## Return Values + /// + /// This function returns a 2-tuple (pair) of values: + /// + /// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer. + /// See [`CustomResourceDefinitionMaintainer::run`] for more details. + /// - The [`oneshot::Receiver`] which will be used to send out a message once the initial + /// CRD reconciliation ran. This signal can be used to trigger the deployment of custom + /// resources defined by the maintained CRDs. pub fn new( client: Client, certificate_rx: mpsc::Receiver, @@ -70,26 +97,36 @@ impl CustomResourceDefinitionMaintainer { (maintainer, initial_reconcile_rx) } + /// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously. + /// + /// This needs to be polled in parallel with other parts of an operator, like controllers or + /// webhook servers. If it is disabled, the returned future immediately resolves to + /// [`std::task::Poll::Ready`] and thus doesn't consume any resources. pub async fn run(mut self) -> Result<(), Error> { let CustomResourceDefinitionMaintainerOptions { operator_service_name, operator_namespace, field_manager, - https_port, + webhook_https_port: https_port, disabled, } = self.options; - // If the maintainer is disabled, immediately return without doing any work. - if disabled { + // If the maintainer is disabled or there are no custom resource definitions, immediately + // return without doing any work. + if disabled || self.definitions.is_empty() { return Ok(()); } + // This get's polled by the async runtime on a regular basis (or when woken up). Once we + // receive a message containing the newly generated TLS certificate for the conversion + // webhook, we need to update the caBundle in the CRD. while let Some(certificate) = self.certificate_rx.recv().await { tracing::info!( k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::>(), "reconciling custom resource definitions" ); + // The caBundle needs to be provided as a base64-encoded PEM envelope. let ca_bundle = certificate .to_pem(LineEnding::LF) .context(EncodeCertificateAuthorityAsPemSnafu)?; @@ -109,9 +146,12 @@ impl CustomResourceDefinitionMaintainer { crd.spec.conversion = Some(CustomResourceConversion { strategy: "Webhook".to_owned(), webhook: Some(WebhookConversion { - // conversionReviewVersions indicates what ConversionReview versions are understood/preferred by the webhook. - // The first version in the list understood by the API server is sent to the webhook. - // The webhook must respond with a ConversionReview object in the same version it received. + // conversionReviewVersions indicates what ConversionReview versions are + // supported by the webhook. The first version in the list understood by the + // API server is sent to the webhook. The webhook must respond with a + // ConversionReview object in the same version it received. We only support + // the stable v1 ConversionReview to keep the implementation as simple as + // possible. conversion_review_versions: vec!["v1".to_owned()], client_config: Some(WebhookClientConfig { service: Some(ServiceReference { @@ -120,12 +160,15 @@ impl CustomResourceDefinitionMaintainer { path: Some(format!("/convert/{crd_name}")), port: Some(https_port.into()), }), + // Here, ByteString takes care of encoding the provided content as + // base64. ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), url: None, }), }), }); + // Deploy the updated CRDs using a server-side apply. let patch = Patch::Apply(&crd); let patch_params = PatchParams::apply(&field_manager); crd_api @@ -134,12 +177,15 @@ impl CustomResourceDefinitionMaintainer { .with_context(|_| PatchCrdSnafu { crd_name })?; } - // Once all CRDs are reconciled, send a heartbeat for consumers to be notified that - // custom resources of these kinds can bow be deployed. + // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out + // via the oneshot channel. This channel can only be used exactly once. The sender's + // send method consumes self, and as such, the sender is wrapped in an Option to be + // able to call take to consume the inner value. if let Some(initial_reconcile_tx) = self.initial_reconcile_tx.take() { - initial_reconcile_tx - .send(()) - .ignore_context(SendInitialReconcileHeartbeatSnafu)? + match initial_reconcile_tx.send(()) { + Ok(_) => {} + Err(_) => return SendInitialReconcileHeartbeatSnafu.fail(), + } } } @@ -148,30 +194,20 @@ impl CustomResourceDefinitionMaintainer { } // TODO (@Techassi): Make this a builder instead +/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`]. pub struct CustomResourceDefinitionMaintainerOptions { + /// The service name used by the operator/conversion webhook. operator_service_name: String, + + /// The namespace the operator/conversion webhook runs in. operator_namespace: String, + + /// The name of the field manager used for the server-side apply. field_manager: String, - https_port: u16, - disabled: bool, -} -trait ResultContextExt { - fn ignore_context(self, context: C) -> Result - where - C: snafu::IntoError, - E2: std::error::Error + snafu::ErrorCompat; -} + /// The HTTPS port the conversion webhook listens on. + webhook_https_port: u16, -impl ResultContextExt for Result { - fn ignore_context(self, context: C) -> Result - where - C: snafu::IntoError, - E2: std::error::Error + snafu::ErrorCompat, - { - match self { - Ok(v) => Ok(v), - Err(_) => Err(context.into_error(snafu::NoneError)), - } - } + /// Indicates if the maintainer should be disabled. + disabled: bool, } diff --git a/crates/stackable-webhook/src/options.rs b/crates/stackable-webhook/src/options.rs index 7685c6163..b7eeaffea 100644 --- a/crates/stackable-webhook/src/options.rs +++ b/crates/stackable-webhook/src/options.rs @@ -1,4 +1,5 @@ -//! Contains available options to configure the [WebhookServer][crate::WebhookServer]. +//! Contains available options to configure the [WebhookServer]. + use std::{ net::{IpAddr, SocketAddr}, path::PathBuf, diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 7c0c85fd3..3382ca34a 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -74,76 +74,59 @@ impl ConversionWebhookServer { /// [`WebhookServer::DEFAULT_SOCKET_ADDRESS`]. pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = WebhookServer::DEFAULT_SOCKET_ADDRESS; - /// Creates a new conversion webhook server, which expects POST requests being made to the - /// `/convert/{CRD_NAME}` endpoint. + /// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being + /// made to the `/convert/{CRD_NAME}` endpoint. /// - /// You need to provide a few things for every CRD passed in via the `crds_and_handlers` argument: + /// ## Parameters /// - /// 1. The CRD - /// 2. A conversion function to convert between CRD versions. Typically you would use the - /// the auto-generated `try_convert` function on CRD spec definition structs for this. - /// 3. A [`kube::Client`] used to create/update the CRDs. + /// This function expects the following parameters: /// - /// The [`ConversionWebhookServer`] takes care of reconciling the CRDs into the Kubernetes - /// cluster and takes care of adding itself as conversion webhook. This includes TLS - /// certificates and CA bundles. + /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`] + /// to a handler function. In most cases, the generated `CustomResource::try_merge` function + /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview` + /// signature. + /// - `options`: Provides [`ConversionWebhookOptions`] to customize various parts of the + /// webhook server, eg. the socket address used to listen on. /// - /// # Example + /// ## Return Values /// - /// ```no_run - /// use clap::Parser; - /// use stackable_webhook::{ - /// servers::{ConversionWebhookServer, ConversionWebhookOptions}, - /// constants::CONVERSION_WEBHOOK_HTTPS_PORT, - /// WebhookOptions - /// }; - /// use stackable_operator::{ - /// kube::Client, - /// crd::s3::{S3Connection, S3ConnectionVersion}, - /// cli::{RunArguments, MaintenanceOptions}, - /// }; + /// This function returns a [`Result`] which contains a 2-tuple (pair) of values for the [`Ok`] + /// variant: + /// + /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See + /// [`ConversionWebhookServer::run`] for more details. + /// - The [`mpsc::Receiver`] which will be used to send out messages containing the newly + /// generated TLS certificate. This channel is used by the CRD maintainer to trigger a + /// reconcile of the CRDs it maintains. /// - /// # async fn test() { - /// // Things that should already be in you operator: - /// const OPERATOR_NAME: &str = "product-operator"; - /// let client = Client::try_default().await.expect("failed to create Kubernetes client"); - /// let RunArguments { - /// operator_environment, - /// maintenance: MaintenanceOptions { - /// disable_crd_maintenance, - /// .. - /// }, - /// .. - /// } = RunArguments::parse(); + /// ## Example + /// + /// ``` + /// use stackable_webhook::{ConversionWebhookServer, ConversionWebhookOptions}; + /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion}; /// - /// let crds_and_handlers = [ + /// # #[tokio::test] + /// # async fn main() { + /// let crds_and_handlers = vec![ /// ( /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) - /// .expect("failed to merge S3Connection CRD"), - /// S3Connection::try_convert as fn(_) -> _, - /// ), + /// .expect("the S3Connection CRD must be merged"), + /// S3Connection::try_convert, + /// ) /// ]; /// /// let options = ConversionWebhookOptions { - /// socket_addr: format!("0.0.0.0:{CONVERSION_WEBHOOK_HTTPS_PORT}") - /// .parse() - /// .expect("static address is always valid"), - /// namespace: operator_environment.operator_namespace, - /// service_name: operator_environment.operator_service_name, - /// maintain_crds: !disable_crd_maintenance, - /// field_manager: OPERATOR_NAME.to_owned(), + /// socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS, + /// namespace: "stackable-operators".to_owned(), + /// service_name: "product-operator".to_owned(), /// }; /// - /// // Construct the conversion webhook server - /// let conversion_webhook = ConversionWebhookServer::new( - /// crds_and_handlers, - /// options, - /// client, - /// ) - /// .await - /// .expect("failed to create ConversionWebhookServer"); + /// let (conversion_webhook_server, _certificate_rx) = + /// ConversionWebhookServer::new(crds_and_handlers, options) + /// .await + /// .unwrap(); /// - /// conversion_webhook.run().await.expect("failed to run ConversionWebhookServer"); + /// conversion_webhook_server.run().await.unwrap(); /// # } /// ``` #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))] From 9aa8941fb186ef840d51c06c6d476ca8331bd141 Mon Sep 17 00:00:00 2001 From: Techassi Date: Tue, 23 Sep 2025 17:41:04 +0200 Subject: [PATCH 03/15] fix: Use default crypto provider for TLS server --- crates/stackable-webhook/src/servers/conversion.rs | 10 ++++++---- crates/stackable-webhook/src/tls/cert_resolver.rs | 2 +- crates/stackable-webhook/src/tls/mod.rs | 13 +++++++++---- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index 3382ca34a..dbc01b1d1 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -101,12 +101,14 @@ impl ConversionWebhookServer { /// /// ## Example /// - /// ``` - /// use stackable_webhook::{ConversionWebhookServer, ConversionWebhookOptions}; + /// ```no_run + /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider}; + /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions}; /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion}; /// - /// # #[tokio::test] + /// # #[tokio::main] /// # async fn main() { + /// # CryptoProvider::install_default(default_provider()).unwrap(); /// let crds_and_handlers = vec![ /// ( /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) @@ -179,7 +181,7 @@ impl ConversionWebhookServer { /// Runs the [`ConversionWebhookServer`] asynchronously. pub async fn run(self) -> Result<(), ConversionWebhookError> { - tracing::info!("starting conversion webhook server"); + tracing::info!("run conversion webhook server"); self.0.run().await.context(RunWebhookServerSnafu) } } diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs index 6320c0ce8..0437830ea 100644 --- a/crates/stackable-webhook/src/tls/cert_resolver.rs +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -44,7 +44,7 @@ pub enum CertificateResolverError { TokioSpawnBlocking { source: tokio::task::JoinError }, #[snafu(display("no default rustls CryptoProvider installed"))] - NoDefaultCryptoProviderInstalled {}, + NoDefaultCryptoProviderInstalled, } /// This struct serves as [`ResolvesServerCert`] to always hand out the current certificate for TLS diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index a796e8b38..142ce23a4 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -11,7 +11,7 @@ use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::{FutureExt, SpanKind}; use opentelemetry_semantic_conventions as semconv; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use stackable_shared::time::Duration; use tokio::{ net::{TcpListener, TcpStream}, @@ -21,7 +21,7 @@ use tokio_rustls::{ TlsAcceptor, rustls::{ ServerConfig, - crypto::ring::default_provider, + crypto::CryptoProvider, version::{TLS12, TLS13}, }, }; @@ -59,6 +59,9 @@ pub enum TlsServerError { #[snafu(display("failed to set safe TLS protocol versions"))] SetSafeTlsProtocolVersions { source: tokio_rustls::rustls::Error }, + + #[snafu(display("no default rustls CryptoProvider installed"))] + NoDefaultCryptoProviderInstalled, } /// A server which terminates TLS connections and allows clients to communicate @@ -97,8 +100,10 @@ impl TlsServer { .context(CreateCertificateResolverSnafu)?; let cert_resolver = Arc::new(cert_resolver); - let tls_provider = default_provider(); - let mut config = ServerConfig::builder_with_provider(tls_provider.into()) + let tls_provider = + CryptoProvider::get_default().context(NoDefaultCryptoProviderInstalledSnafu)?; + + let mut config = ServerConfig::builder_with_provider(tls_provider.clone()) .with_protocol_versions(&[&TLS12, &TLS13]) .context(SetSafeTlsProtocolVersionsSnafu)? .with_no_client_auth() From 362eb4827ab83568b7a2056c3d27eb95a6e8b798 Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 24 Sep 2025 16:45:11 +0200 Subject: [PATCH 04/15] chore(stackable-operator): Gate maintainer behind webhook feature --- crates/stackable-operator/Cargo.toml | 2 +- crates/stackable-operator/src/crd/mod.rs | 2 +- crates/stackable-webhook/src/lib.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/stackable-operator/Cargo.toml b/crates/stackable-operator/Cargo.toml index 3deef516f..e70171e97 100644 --- a/crates/stackable-operator/Cargo.toml +++ b/crates/stackable-operator/Cargo.toml @@ -16,7 +16,7 @@ certs = ["dep:stackable-certs", "dep:x509-cert"] telemetry = ["dep:stackable-telemetry"] time = ["stackable-shared/time"] versioned = ["dep:stackable-versioned"] -webhook = ["dep:stackable-webhook"] +webhook = ["dep:stackable-webhook", "dep:x509-cert"] [dependencies] stackable-certs = { path = "../stackable-certs", optional = true } diff --git a/crates/stackable-operator/src/crd/mod.rs b/crates/stackable-operator/src/crd/mod.rs index 791233267..585999c08 100644 --- a/crates/stackable-operator/src/crd/mod.rs +++ b/crates/stackable-operator/src/crd/mod.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; pub mod authentication; pub mod git_sync; pub mod listener; -#[cfg(feature = "certs")] +#[cfg(feature = "webhook")] pub mod maintainer; pub mod s3; diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 37ecbbd6f..0ac3f2b31 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -37,7 +37,7 @@ use tokio::{ sync::mpsc, }; use tower::ServiceBuilder; -use x509_cert::Certificate; +pub use x509_cert::Certificate; // use tower_http::trace::TraceLayer; use crate::tls::TlsServer; From c8d06210180b31b85c4d88b887f7ed16d437c482 Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 24 Sep 2025 16:49:27 +0200 Subject: [PATCH 05/15] fix(stackable-operator): Make options fields public --- crates/stackable-operator/src/crd/maintainer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs index 74d6c18b5..6c9041c89 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -197,17 +197,17 @@ impl CustomResourceDefinitionMaintainer { /// This contains required options to customize a [`CustomResourceDefinitionMaintainer`]. pub struct CustomResourceDefinitionMaintainerOptions { /// The service name used by the operator/conversion webhook. - operator_service_name: String, + pub operator_service_name: String, /// The namespace the operator/conversion webhook runs in. - operator_namespace: String, + pub operator_namespace: String, /// The name of the field manager used for the server-side apply. - field_manager: String, + pub field_manager: String, /// The HTTPS port the conversion webhook listens on. - webhook_https_port: u16, + pub webhook_https_port: u16, /// Indicates if the maintainer should be disabled. - disabled: bool, + pub disabled: bool, } From 2fc5c28d55939f5c72c77580531ad8a9f4ebe266 Mon Sep 17 00:00:00 2001 From: Techassi Date: Thu, 25 Sep 2025 12:13:57 +0200 Subject: [PATCH 06/15] feat(stackable-operator): Add create_if_missing method to client Co-authored-by: Sebastian Bernauer --- crates/stackable-operator/src/client.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/stackable-operator/src/client.rs b/crates/stackable-operator/src/client.rs index f79a1eb91..b910eaba9 100644 --- a/crates/stackable-operator/src/client.rs +++ b/crates/stackable-operator/src/client.rs @@ -253,6 +253,25 @@ impl Client { }) } + /// Optionally creates a resource if it does not exist yet. + /// + /// The name used for lookup is extracted from the resource via [`ResourceExt::name_any()`]. + /// This function either returns the existing resource or the newly created one. + pub async fn create_if_missing(&self, resource: &T) -> Result + where + T: Clone + Debug + DeserializeOwned + Resource + Serialize + GetApi, + ::DynamicType: Default, + { + if let Some(r) = self + .get_opt(&resource.name_any(), resource.get_namespace()) + .await? + { + return Ok(r); + } + + self.create(resource).await + } + /// Patches a resource using the `MERGE` patch strategy described /// in [JSON Merge Patch](https://tools.ietf.org/html/rfc7386) /// This will fail for objects that do not exist yet. From 59a9426540c1fa92f195d8087314a20657137ead Mon Sep 17 00:00:00 2001 From: Techassi Date: Thu, 25 Sep 2025 12:34:54 +0200 Subject: [PATCH 07/15] chore(webhook): Add changelog entry --- crates/stackable-webhook/CHANGELOG.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index cd116e86d..9d8b0e602 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- BREAKING: The `ConversionWebhookServer` now returns a pair of values ([#1099]): + - The conversion webhook server itself + - A `mpsc::Receiver` to provide consumers the newly generated TLS certificate +- BREAKING: Constants for ports, IP addresses and socket addresses are now associated constants on + `(Conversion)WebhookServer` instead of free-standing ones ([#1099]). + +### Removed + +- BREAKING: The `maintain_crds` and `field_manager` fields in `ConversionWebhookOptions` + are removed ([#1099]). + +[#1099]: https://github.com/stackabletech/operator-rs/pull/1099 + ## [0.6.0] - 2025-09-09 ### Added From 77cb05b1a8d60809e7910a3f7e06c10d6bde0716 Mon Sep 17 00:00:00 2001 From: Techassi Date: Thu, 25 Sep 2025 12:35:30 +0200 Subject: [PATCH 08/15] chore(operator): Add changelog entry --- crates/stackable-operator/CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index d0d385dd4..2fc5adf59 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -6,6 +6,12 @@ All notable changes to this project will be documented in this file. ### Added +- Add `CustomResourceDefinitionMaintainer` which applies and patches CRDs triggered by TLS + certificate rotations of the `ConversionWebhookServer`. It additionally provides a `oneshot` + channel which can be used to trigger creation/patching of any default custom resources deployed by + the operator ([#1099]). +- Add a `Client::create_if_missing` associated function to create a resource if it doesn't + exist ([#1099]). - Add end-of-support checker ([#1096]). - The EoS checker can be constructed using `EndOfSupportChecker::new()`. - Add new `MaintenanceOptions` and `EndOfSupportOptions` structs. @@ -24,6 +30,7 @@ All notable changes to this project will be documented in this file. [#1096]: https://github.com/stackabletech/operator-rs/pull/1096 [#1098]: https://github.com/stackabletech/operator-rs/pull/1098 +[#1099]: https://github.com/stackabletech/operator-rs/pull/1099 ## [0.98.0] - 2025-09-22 From 106dd162afe8c95364e87159b6773efbc0d94cb8 Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 1 Oct 2025 11:06:07 +0200 Subject: [PATCH 09/15] chore: Streamline feature gate --- Cargo.lock | 1 - crates/stackable-operator/Cargo.toml | 7 +++---- crates/stackable-operator/src/crd/maintainer.rs | 7 ++----- crates/stackable-webhook/Cargo.toml | 4 ++++ crates/stackable-webhook/src/lib.rs | 14 ++++++++++---- 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 019ca9a39..9ad1caf0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2948,7 +2948,6 @@ dependencies = [ "tracing-appender", "tracing-subscriber", "url", - "x509-cert", ] [[package]] diff --git a/crates/stackable-operator/Cargo.toml b/crates/stackable-operator/Cargo.toml index e70171e97..179e6209d 100644 --- a/crates/stackable-operator/Cargo.toml +++ b/crates/stackable-operator/Cargo.toml @@ -12,11 +12,11 @@ full = ["certs", "telemetry", "versioned", "time", "webhook", "clap"] default = ["telemetry", "versioned", "clap"] clap = [] -certs = ["dep:stackable-certs", "dep:x509-cert"] +certs = ["dep:stackable-certs"] telemetry = ["dep:stackable-telemetry"] time = ["stackable-shared/time"] versioned = ["dep:stackable-versioned"] -webhook = ["dep:stackable-webhook", "dep:x509-cert"] +webhook = ["dep:stackable-webhook"] [dependencies] stackable-certs = { path = "../stackable-certs", optional = true } @@ -24,7 +24,7 @@ stackable-operator-derive = { path = "../stackable-operator-derive" } stackable-shared = { path = "../stackable-shared" } stackable-telemetry = { path = "../stackable-telemetry", optional = true, features = ["clap"] } stackable-versioned = { path = "../stackable-versioned", optional = true } -stackable-webhook = { path = "../stackable-webhook", optional = true } +stackable-webhook = { path = "../stackable-webhook", optional = true, features = ["maintainer"]} chrono.workspace = true clap.workspace = true @@ -53,7 +53,6 @@ tracing.workspace = true tracing-appender.workspace = true tracing-subscriber.workspace = true url.workspace = true -x509-cert = { workspace = true, optional = true } [dev-dependencies] rstest.workspace = true diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs index 6c9041c89..7b36e391b 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -10,16 +10,13 @@ use kube::{ api::{Patch, PatchParams}, }; use snafu::{ResultExt, Snafu}; +use stackable_webhook::x509_cert::{self, Certificate, EncodePem, LineEnding}; use tokio::sync::{mpsc, oneshot}; -use x509_cert::{ - Certificate, - der::{EncodePem, pem::LineEnding}, -}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("failed to encode CA certificate as PEM format"))] - EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, + EncodeCertificateAuthorityAsPem { source: x509_cert::Error }, #[snafu(display("failed to send initial CRD reconcile heartbeat"))] SendInitialReconcileHeartbeat, diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index bf8003316..e55a4becb 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -6,6 +6,10 @@ license.workspace = true edition.workspace = true repository.workspace = true +[features] +# Re-export selected items from the x509-cert crate needed by the CRD maintainer +maintainer = [] + [dependencies] stackable-certs = { path = "../stackable-certs", features = ["rustls"] } stackable-shared = { path = "../stackable-shared" } diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 0ac3f2b31..41c600a9f 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -28,6 +28,7 @@ //! [1]: crate::servers::ConversionWebhookServer use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use ::x509_cert::Certificate; use axum::{Router, routing::get}; use futures_util::{FutureExt as _, pin_mut, select}; use snafu::{ResultExt, Snafu}; @@ -37,17 +38,22 @@ use tokio::{ sync::mpsc, }; use tower::ServiceBuilder; -pub use x509_cert::Certificate; -// use tower_http::trace::TraceLayer; +// Selected re-exports +pub use crate::options::WebhookOptions; use crate::tls::TlsServer; pub mod options; pub mod servers; pub mod tls; -// Selected re-exports -pub use crate::options::WebhookOptions; +#[cfg(feature = "maintainer")] +pub mod x509_cert { + pub use ::x509_cert::{ + Certificate, + der::{EncodePem, Error, pem::LineEnding}, + }; +} /// A generic webhook handler receiving a request and sending back a response. /// From bbf49f5cb8fb10027f99aa62a8a46438ee86af86 Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 1 Oct 2025 11:06:28 +0200 Subject: [PATCH 10/15] docs(operator): Add example in doc comment --- .../stackable-operator/src/crd/maintainer.rs | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs index 7b36e391b..2ddd6023c 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -74,6 +74,45 @@ impl CustomResourceDefinitionMaintainer { /// - The [`oneshot::Receiver`] which will be used to send out a message once the initial /// CRD reconciliation ran. This signal can be used to trigger the deployment of custom /// resources defined by the maintained CRDs. + /// + /// ## Example + /// + /// ```no_run + /// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion}; + /// # use stackable_webhook::x509_cert::Certificate; + /// # use tokio::sync::mpsc::channel; + /// # use kube::Client; + /// use stackable_operator::crd::maintainer::{ + /// CustomResourceDefinitionMaintainerOptions, + /// CustomResourceDefinitionMaintainer, + /// }; + /// + /// # #[tokio::main] + /// # async fn main() { + /// # let (certificate_tx, certificate_rx) = channel(1); + /// let options = CustomResourceDefinitionMaintainerOptions { + /// operator_service_name: "my-service-name".to_owned(), + /// operator_namespace: "my-namespace".to_owned(), + /// field_manager: "my-operator".to_owned(), + /// webhook_https_port: 8443, + /// disabled: true, + /// }; + /// + /// let client = Client::try_default().await.unwrap(); + /// + /// let definitions = vec![ + /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(), + /// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(), + /// ]; + /// + /// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( + /// client, + /// certificate_rx, + /// definitions, + /// options, + /// ); + /// # } + /// ``` pub fn new( client: Client, certificate_rx: mpsc::Receiver, From ac5b9cbe4faed1b465da1897eb2764f6cab3e21e Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 1 Oct 2025 11:07:53 +0200 Subject: [PATCH 11/15] refactor(operator): Adjust oneshot channel handling --- crates/stackable-operator/src/crd/maintainer.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs index 2ddd6023c..5bdafa70d 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -41,7 +41,7 @@ pub struct CustomResourceDefinitionMaintainer { definitions: Vec, options: CustomResourceDefinitionMaintainerOptions, - initial_reconcile_tx: Option>, + initial_reconcile_tx: oneshot::Sender<()>, } impl CustomResourceDefinitionMaintainer { @@ -120,7 +120,6 @@ impl CustomResourceDefinitionMaintainer { options: CustomResourceDefinitionMaintainerOptions, ) -> (Self, oneshot::Receiver<()>) { let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); - let initial_reconcile_tx = Some(initial_reconcile_tx); let maintainer = Self { definitions: definitions.into_iter().collect(), @@ -153,6 +152,11 @@ impl CustomResourceDefinitionMaintainer { return Ok(()); } + // This channel can only be used exactly once. The sender's send method consumes self, and + // as such, the sender is wrapped in an Option to be able to call take to consume the inner + // value. + let mut initial_reconcile_tx = Some(self.initial_reconcile_tx); + // This get's polled by the async runtime on a regular basis (or when woken up). Once we // receive a message containing the newly generated TLS certificate for the conversion // webhook, we need to update the caBundle in the CRD. @@ -214,10 +218,8 @@ impl CustomResourceDefinitionMaintainer { } // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out - // via the oneshot channel. This channel can only be used exactly once. The sender's - // send method consumes self, and as such, the sender is wrapped in an Option to be - // able to call take to consume the inner value. - if let Some(initial_reconcile_tx) = self.initial_reconcile_tx.take() { + // via the oneshot channel. + if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() { match initial_reconcile_tx.send(()) { Ok(_) => {} Err(_) => return SendInitialReconcileHeartbeatSnafu.fail(), From d51491e82701dc972f888763cc607d9bb2de7600 Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 1 Oct 2025 11:17:04 +0200 Subject: [PATCH 12/15] chore: Apply suggestions Co-authored-by: Sebastian Bernauer --- crates/stackable-operator/CHANGELOG.md | 2 +- crates/stackable-operator/src/crd/maintainer.rs | 8 ++++---- crates/stackable-webhook/src/lib.rs | 4 ++++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 2fc5adf59..94085db06 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -8,7 +8,7 @@ All notable changes to this project will be documented in this file. - Add `CustomResourceDefinitionMaintainer` which applies and patches CRDs triggered by TLS certificate rotations of the `ConversionWebhookServer`. It additionally provides a `oneshot` - channel which can be used to trigger creation/patching of any default custom resources deployed by + channel which can for example be used to trigger creation/patching of any custom resources deployed by the operator ([#1099]). - Add a `Client::create_if_missing` associated function to create a resource if it doesn't exist ([#1099]). diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs index 5bdafa70d..c2a5a7419 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -220,10 +220,10 @@ impl CustomResourceDefinitionMaintainer { // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out // via the oneshot channel. if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() { - match initial_reconcile_tx.send(()) { - Ok(_) => {} - Err(_) => return SendInitialReconcileHeartbeatSnafu.fail(), - } + ensure!( + initial_reconcile_tx.send(()).is_ok(), + SendInitialReconcileHeartbeatSnafu + ); } } diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 41c600a9f..f437aed4d 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -97,6 +97,10 @@ impl WebhookServer { pub const DEFAULT_HTTPS_PORT: u16 = 8443; /// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, /// which represents binding on all network addresses. + // + // TODO: We might want to switch to `Ipv6Addr::UNSPECIFIED)` here, as this *normally* binds to IPv4 + // and IPv6. However, it's complicated and depends on the underlying system... + // If we do so, we should set `set_only_v6(false)` on the socket to not rely on system defaults. pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); /// The default socket address `0.0.0.0:8443` the webhook server binds to. pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = From d0c4a57c33cd8bc521e050aa66b3b17ebb0573d2 Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 1 Oct 2025 11:37:04 +0200 Subject: [PATCH 13/15] chore(webhook): Update dev comment --- crates/stackable-webhook/src/servers/conversion.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index dbc01b1d1..b3467ad96 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -163,7 +163,8 @@ impl ConversionWebhookServer { // This is how Kubernetes calls us, so it decides about the naming. // AFAIK we can not influence this, so this is the only SAN entry needed. - // FIXME (@Techassi): The cluster domain should be included here to form FQDN of the service + // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes) + // HTTP clients can use the FQDN of the service for testing or user use-cases. let subject_alterative_dns_name = format!("{operator_service_name}.{operator_namespace}.svc",); From 44a701aba82a49a6cce430b928a7061a9b1c201c Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 1 Oct 2025 14:21:16 +0200 Subject: [PATCH 14/15] fix(operator): Import ensure! macro --- crates/stackable-operator/src/crd/maintainer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-operator/src/crd/maintainer.rs index c2a5a7419..c3379c2e0 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-operator/src/crd/maintainer.rs @@ -9,7 +9,7 @@ use kube::{ Api, Client, ResourceExt, api::{Patch, PatchParams}, }; -use snafu::{ResultExt, Snafu}; +use snafu::{ResultExt, Snafu, ensure}; use stackable_webhook::x509_cert::{self, Certificate, EncodePem, LineEnding}; use tokio::sync::{mpsc, oneshot}; From 99eae00f5090da3023369ff6571578c86ea8c5dd Mon Sep 17 00:00:00 2001 From: Techassi Date: Wed, 8 Oct 2025 16:39:23 +0200 Subject: [PATCH 15/15] feat: Add ConversionWebhookServer::with_maintainer --- crates/stackable-operator/Cargo.toml | 2 +- crates/stackable-operator/src/crd/mod.rs | 2 - crates/stackable-webhook/Cargo.toml | 4 - crates/stackable-webhook/src/lib.rs | 9 +- .../src}/maintainer.rs | 50 ++++++----- .../src/servers/conversion.rs | 83 ++++++++++++++++--- .../src/tls/cert_resolver.rs | 21 +++-- crates/stackable-webhook/src/tls/mod.rs | 6 +- 8 files changed, 113 insertions(+), 64 deletions(-) rename crates/{stackable-operator/src/crd => stackable-webhook/src}/maintainer.rs (87%) diff --git a/crates/stackable-operator/Cargo.toml b/crates/stackable-operator/Cargo.toml index 8205104e6..aecdee49f 100644 --- a/crates/stackable-operator/Cargo.toml +++ b/crates/stackable-operator/Cargo.toml @@ -24,7 +24,7 @@ stackable-operator-derive = { path = "../stackable-operator-derive" } stackable-shared = { path = "../stackable-shared", features = ["chrono", "time"] } stackable-telemetry = { path = "../stackable-telemetry", optional = true, features = ["clap"] } stackable-versioned = { path = "../stackable-versioned", optional = true } -stackable-webhook = { path = "../stackable-webhook", optional = true, features = ["maintainer"]} +stackable-webhook = { path = "../stackable-webhook", optional = true} chrono.workspace = true clap.workspace = true diff --git a/crates/stackable-operator/src/crd/mod.rs b/crates/stackable-operator/src/crd/mod.rs index 585999c08..3beb69aa8 100644 --- a/crates/stackable-operator/src/crd/mod.rs +++ b/crates/stackable-operator/src/crd/mod.rs @@ -7,8 +7,6 @@ use serde::{Deserialize, Serialize}; pub mod authentication; pub mod git_sync; pub mod listener; -#[cfg(feature = "webhook")] -pub mod maintainer; pub mod s3; /// A reference to a product cluster (for example, a `ZookeeperCluster`) diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index e55a4becb..bf8003316 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -6,10 +6,6 @@ license.workspace = true edition.workspace = true repository.workspace = true -[features] -# Re-export selected items from the x509-cert crate needed by the CRD maintainer -maintainer = [] - [dependencies] stackable-certs = { path = "../stackable-certs", features = ["rustls"] } stackable-shared = { path = "../stackable-shared" } diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index f437aed4d..ee33ab542 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -43,18 +43,11 @@ use tower::ServiceBuilder; pub use crate::options::WebhookOptions; use crate::tls::TlsServer; +pub mod maintainer; pub mod options; pub mod servers; pub mod tls; -#[cfg(feature = "maintainer")] -pub mod x509_cert { - pub use ::x509_cert::{ - Certificate, - der::{EncodePem, Error, pem::LineEnding}, - }; -} - /// A generic webhook handler receiving a request and sending back a response. /// /// This trait is not intended to be implemented by external crates and this diff --git a/crates/stackable-operator/src/crd/maintainer.rs b/crates/stackable-webhook/src/maintainer.rs similarity index 87% rename from crates/stackable-operator/src/crd/maintainer.rs rename to crates/stackable-webhook/src/maintainer.rs index c3379c2e0..649ac7da7 100644 --- a/crates/stackable-operator/src/crd/maintainer.rs +++ b/crates/stackable-webhook/src/maintainer.rs @@ -10,13 +10,16 @@ use kube::{ api::{Patch, PatchParams}, }; use snafu::{ResultExt, Snafu, ensure}; -use stackable_webhook::x509_cert::{self, Certificate, EncodePem, LineEnding}; use tokio::sync::{mpsc, oneshot}; +use x509_cert::{ + Certificate, + der::{EncodePem, pem::LineEnding}, +}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("failed to encode CA certificate as PEM format"))] - EncodeCertificateAuthorityAsPem { source: x509_cert::Error }, + EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, #[snafu(display("failed to send initial CRD reconcile heartbeat"))] SendInitialReconcileHeartbeat, @@ -34,17 +37,17 @@ pub enum Error { /// /// - Apply the CRDs when starting up /// - Reconcile the CRDs when the conversion webhook certificate is rotated -pub struct CustomResourceDefinitionMaintainer { +pub struct CustomResourceDefinitionMaintainer<'a> { client: Client, certificate_rx: mpsc::Receiver, definitions: Vec, - options: CustomResourceDefinitionMaintainerOptions, + options: CustomResourceDefinitionMaintainerOptions<'a>, initial_reconcile_tx: oneshot::Sender<()>, } -impl CustomResourceDefinitionMaintainer { +impl<'a> CustomResourceDefinitionMaintainer<'a> { /// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more /// custom resource definitions. /// @@ -79,10 +82,10 @@ impl CustomResourceDefinitionMaintainer { /// /// ```no_run /// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion}; - /// # use stackable_webhook::x509_cert::Certificate; /// # use tokio::sync::mpsc::channel; + /// # use x509_cert::Certificate; /// # use kube::Client; - /// use stackable_operator::crd::maintainer::{ + /// use stackable_webhook::maintainer::{ /// CustomResourceDefinitionMaintainerOptions, /// CustomResourceDefinitionMaintainer, /// }; @@ -91,9 +94,8 @@ impl CustomResourceDefinitionMaintainer { /// # async fn main() { /// # let (certificate_tx, certificate_rx) = channel(1); /// let options = CustomResourceDefinitionMaintainerOptions { - /// operator_service_name: "my-service-name".to_owned(), - /// operator_namespace: "my-namespace".to_owned(), - /// field_manager: "my-operator".to_owned(), + /// operator_name: "my-service-name", + /// operator_namespace: "my-namespace", /// webhook_https_port: 8443, /// disabled: true, /// }; @@ -117,7 +119,7 @@ impl CustomResourceDefinitionMaintainer { client: Client, certificate_rx: mpsc::Receiver, definitions: impl IntoIterator, - options: CustomResourceDefinitionMaintainerOptions, + options: CustomResourceDefinitionMaintainerOptions<'a>, ) -> (Self, oneshot::Receiver<()>) { let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); @@ -139,10 +141,9 @@ impl CustomResourceDefinitionMaintainer { /// [`std::task::Poll::Ready`] and thus doesn't consume any resources. pub async fn run(mut self) -> Result<(), Error> { let CustomResourceDefinitionMaintainerOptions { - operator_service_name, operator_namespace, - field_manager, - webhook_https_port: https_port, + webhook_https_port, + operator_name, disabled, } = self.options; @@ -173,7 +174,7 @@ impl CustomResourceDefinitionMaintainer { let crd_api: Api = Api::all(self.client.clone()); - for mut crd in self.definitions.iter().cloned() { + for crd in self.definitions.iter_mut() { let crd_kind = &crd.spec.names.kind; let crd_name = crd.name_any(); @@ -195,10 +196,10 @@ impl CustomResourceDefinitionMaintainer { conversion_review_versions: vec!["v1".to_owned()], client_config: Some(WebhookClientConfig { service: Some(ServiceReference { - name: operator_service_name.clone(), - namespace: operator_namespace.clone(), + name: operator_name.to_owned(), + namespace: operator_namespace.to_owned(), path: Some(format!("/convert/{crd_name}")), - port: Some(https_port.into()), + port: Some(webhook_https_port.into()), }), // Here, ByteString takes care of encoding the provided content as // base64. @@ -210,7 +211,7 @@ impl CustomResourceDefinitionMaintainer { // Deploy the updated CRDs using a server-side apply. let patch = Patch::Apply(&crd); - let patch_params = PatchParams::apply(&field_manager); + let patch_params = PatchParams::apply(operator_name); crd_api .patch(&crd_name, &patch_params, &patch) .await @@ -233,15 +234,12 @@ impl CustomResourceDefinitionMaintainer { // TODO (@Techassi): Make this a builder instead /// This contains required options to customize a [`CustomResourceDefinitionMaintainer`]. -pub struct CustomResourceDefinitionMaintainerOptions { - /// The service name used by the operator/conversion webhook. - pub operator_service_name: String, +pub struct CustomResourceDefinitionMaintainerOptions<'a> { + /// The service name used by the operator/conversion webhook and as a field manager. + pub operator_name: &'a str, /// The namespace the operator/conversion webhook runs in. - pub operator_namespace: String, - - /// The name of the field manager used for the server-side apply. - pub field_manager: String, + pub operator_namespace: &'a str, /// The HTTPS port the conversion webhook listens on. pub webhook_https_port: u16, diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index b3467ad96..9b4beab2e 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -2,18 +2,22 @@ use std::{fmt::Debug, net::SocketAddr}; use axum::{Json, Router, routing::post}; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; -use kube::ResourceExt; // Re-export this type because users of the conversion webhook server require // this type to write the handler function. Instead of importing this type from // kube directly, consumers can use this type instead. This also eliminates // keeping the kube dependency version in sync between here and the operator. pub use kube::core::conversion::ConversionReview; +use kube::{Client, ResourceExt}; use snafu::{ResultExt, Snafu}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::instrument; use x509_cert::Certificate; -use crate::{WebhookError, WebhookHandler, WebhookServer, options::WebhookOptions}; +use crate::{ + WebhookError, WebhookHandler, WebhookServer, + maintainer::{CustomResourceDefinitionMaintainer, CustomResourceDefinitionMaintainerOptions}, + options::WebhookOptions, +}; #[derive(Debug, Snafu)] pub enum ConversionWebhookError { @@ -53,15 +57,15 @@ where // TODO: Add a builder, maybe with `bon`. #[derive(Debug)] -pub struct ConversionWebhookOptions { +pub struct ConversionWebhookOptions<'a> { /// The bind address to bind the HTTPS server to. pub socket_addr: SocketAddr, /// The namespace the operator/webhook is running in. - pub namespace: String, + pub namespace: &'a str, /// The name of the Kubernetes service which points to the operator/webhook. - pub service_name: String, + pub service_name: &'a str, } /// A ready-to-use CRD conversion webhook server. @@ -77,6 +81,8 @@ impl ConversionWebhookServer { /// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being /// made to the `/convert/{CRD_NAME}` endpoint. /// + /// The TLS certificate is automatically generated and rotated. + /// /// ## Parameters /// /// This function expects the following parameters: @@ -119,8 +125,8 @@ impl ConversionWebhookServer { /// /// let options = ConversionWebhookOptions { /// socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS, - /// namespace: "stackable-operators".to_owned(), - /// service_name: "product-operator".to_owned(), + /// namespace: "stackable-operators", + /// service_name: "product-operator", /// }; /// /// let (conversion_webhook_server, _certificate_rx) = @@ -134,7 +140,7 @@ impl ConversionWebhookServer { #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))] pub async fn new( crds_and_handlers: impl IntoIterator, - options: ConversionWebhookOptions, + options: ConversionWebhookOptions<'_>, ) -> Result<(Self, mpsc::Receiver), ConversionWebhookError> where H: WebhookHandler + Clone + Send + Sync + 'static, @@ -142,7 +148,7 @@ impl ConversionWebhookServer { tracing::debug!("create new conversion webhook server"); let mut router = Router::new(); - let mut crds = Vec::new(); + for (crd, handler) in crds_and_handlers { let crd_name = crd.name_any(); let handler_fn = |Json(review): Json| async { @@ -150,9 +156,9 @@ impl ConversionWebhookServer { Json(review) }; + // TODO (@Techassi): Make this part of the trait mentioned above let route = format!("/convert/{crd_name}"); router = router.route(&route, post(handler_fn)); - crds.push(crd); } let ConversionWebhookOptions { @@ -180,6 +186,61 @@ impl ConversionWebhookServer { Ok((Self(server), certificate_rx)) } + /// Creates and returns a tuple consisting of a [`ConversionWebhookServer`], a [`CustomResourceDefinitionMaintainer`], + /// and a [`oneshot::Receiver`]. + /// + /// See the referenced items for more details on usage. + pub async fn with_maintainer<'a, H>( + // TODO (@Techassi): Use a trait type here which can be used to build all part of the + // conversion webhook server and a CRD maintainer. + crds_and_handlers: impl IntoIterator + Clone, + operator_name: &'a str, + operator_namespace: &'a str, + disable_maintainer: bool, + client: Client, + ) -> Result< + ( + Self, + CustomResourceDefinitionMaintainer<'a>, + oneshot::Receiver<()>, + ), + ConversionWebhookError, + > + where + H: WebhookHandler + Clone + Send + Sync + 'static, + { + let socket_addr = ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS; + + // TODO (@Techassi): These should be moved into a builder + let webhook_options = ConversionWebhookOptions { + namespace: operator_namespace, + service_name: operator_name, + socket_addr, + }; + + let (conversion_webhook_server, certificate_rx) = + Self::new(crds_and_handlers.clone(), webhook_options).await?; + + let definitions = crds_and_handlers.into_iter().map(|(crd, _)| crd); + + // TODO (@Techassi): These should be moved into a builder + let maintainer_options = CustomResourceDefinitionMaintainerOptions { + webhook_https_port: socket_addr.port(), + disabled: disable_maintainer, + operator_namespace, + operator_name, + }; + + let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( + client, + certificate_rx, + definitions, + maintainer_options, + ); + + Ok((conversion_webhook_server, maintainer, initial_reconcile_rx)) + } + /// Runs the [`ConversionWebhookServer`] asynchronously. pub async fn run(self) -> Result<(), ConversionWebhookError> { tracing::info!("run conversion webhook server"); diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs index 0437830ea..d355b956c 100644 --- a/crates/stackable-webhook/src/tls/cert_resolver.rs +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -59,23 +59,25 @@ pub struct CertificateResolver { current_certified_key: ArcSwap, subject_alterative_dns_names: Arc>, - cert_tx: mpsc::Sender, + certificate_tx: mpsc::Sender, } impl CertificateResolver { pub async fn new( subject_alterative_dns_names: Vec, - cert_tx: mpsc::Sender, + certificate_tx: mpsc::Sender, ) -> Result { let subject_alterative_dns_names = Arc::new(subject_alterative_dns_names); - let certified_key = - Self::generate_new_certificate_inner(subject_alterative_dns_names.clone(), &cert_tx) - .await?; + let certified_key = Self::generate_new_certificate_inner( + subject_alterative_dns_names.clone(), + &certificate_tx, + ) + .await?; Ok(Self { subject_alterative_dns_names, current_certified_key: ArcSwap::new(certified_key), - cert_tx, + certificate_tx, }) } @@ -90,7 +92,8 @@ impl CertificateResolver { async fn generate_new_certificate(&self) -> Result> { let subject_alterative_dns_names = self.subject_alterative_dns_names.clone(); - Self::generate_new_certificate_inner(subject_alterative_dns_names, &self.cert_tx).await + Self::generate_new_certificate_inner(subject_alterative_dns_names, &self.certificate_tx) + .await } /// Creates a new certificate and returns the certified key. @@ -102,7 +105,7 @@ impl CertificateResolver { /// See [the relevant decision](https://github.com/stackabletech/decisions/issues/56) async fn generate_new_certificate_inner( subject_alterative_dns_names: Arc>, - cert_tx: &mpsc::Sender, + certificate_tx: &mpsc::Sender, ) -> Result> { // The certificate generations can take a while, so we use `spawn_blocking` let (cert, certified_key) = tokio::task::spawn_blocking(move || { @@ -141,7 +144,7 @@ impl CertificateResolver { .await .context(TokioSpawnBlockingSnafu)??; - cert_tx + certificate_tx .send(cert) .await .map_err(|_err| CertificateResolverError::SendCertificateToChannel)?; diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 142ce23a4..cfd7c2e86 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -88,14 +88,14 @@ impl TlsServer { router: Router, options: WebhookOptions, ) -> Result<(Self, mpsc::Receiver)> { - let (cert_tx, cert_rx) = mpsc::channel(1); + let (certificate_tx, certificate_rx) = mpsc::channel(1); let WebhookOptions { socket_addr, subject_alterative_dns_names, } = options; - let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, cert_tx) + let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, certificate_tx) .await .context(CreateCertificateResolverSnafu)?; let cert_resolver = Arc::new(cert_resolver); @@ -117,7 +117,7 @@ impl TlsServer { router, }; - Ok((tls_server, cert_rx)) + Ok((tls_server, certificate_rx)) } /// Runs the TLS server by listening for incoming TCP connections on the