diff --git a/Cargo.lock b/Cargo.lock index 9a4939bd4..0d324aab1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2996,6 +2996,7 @@ name = "stackable-webhook" version = "0.7.1" dependencies = [ "arc-swap", + "async-trait", "axum", "clap", "futures-util", @@ -3006,6 +3007,7 @@ dependencies = [ "opentelemetry", "opentelemetry-semantic-conventions", "rand 0.9.2", + "serde", "serde_json", "snafu 0.8.9", "stackable-certs", diff --git a/Cargo.toml b/Cargo.toml index 1f191b8db..3ac42776f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ repository = "https://github.com/stackabletech/operator-rs" [workspace.dependencies] product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.8.0" } -arc-swap = "1.7" +arc-swap = "1.7.0" +async-trait = "0.1.89" axum = { version = "0.8.1", features = ["http2"] } chrono = { version = "0.4.38", default-features = false } clap = { version = "4.5.17", features = ["derive", "cargo", "env"] } @@ -38,7 +39,7 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = ["schem # We use rustls instead of openssl for easier portability, e.g. so that we can build stackablectl without the need to vendor (build from source) openssl # We use ring instead of aws-lc-rs, as this currently fails to build in "make run-dev" # We pin the kube version, as we use a patch for 2.0.1 below -kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "rustls-tls", "ring"] } +kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] } opentelemetry = "0.31.0" opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } opentelemetry-appender-tracing = "0.31.0" diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index a86b71342..a71ba1a48 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Add support for mutating webhooks ([#1119]). + +### Changed + +- BREAKING: Refactor the entire `WebhookServer` mechanism, so multiple webhooks can run in parallel. + Put individual webhooks (currently `ConversionWebhook` and `MutatingWebhook`) behind the `Webhook` trait ([#1119]). + +[#1119]: https://github.com/stackabletech/operator-rs/pull/1119 + ## [0.7.1] - 2025-10-31 ### Fixed diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index 3dc40e036..0341ecb95 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -12,6 +12,7 @@ stackable-shared = { path = "../stackable-shared" } stackable-telemetry = { path = "../stackable-telemetry" } arc-swap.workspace = true +async-trait.workspace = true axum.workspace = true futures-util.workspace = true hyper-util.workspace = true @@ -21,6 +22,7 @@ kube.workspace = true opentelemetry.workspace = true opentelemetry-semantic-conventions.workspace = true rand.workspace = true +serde.workspace = true serde_json.workspace = true snafu.workspace = true tokio-rustls.workspace = true diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index ee33ab542..7e0ab998f 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -1,92 +1,99 @@ -//! Utility types and functions to easily create ready-to-use webhook servers -//! which can handle different tasks, for example CRD conversions. All webhook -//! servers use HTTPS by default. This library is fully compatible with the -//! [`tracing`] crate and emits debug level tracing data. +//! Utility types and functions to easily create ready-to-use webhook servers which can handle +//! different tasks. All webhook servers use HTTPS by default. //! -//! Most users will only use the top-level exported generic [`WebhookServer`] -//! which enables complete control over the [Router] which handles registering -//! routes and their handler functions. +//! Currently the following webhooks are supported: //! -//! ``` -//! use stackable_webhook::{WebhookServer, WebhookOptions}; -//! use axum::Router; +//! * [webhooks::ConversionWebhook] +//! * [webhooks::MutatingWebhook] +//! * In the future validating webhooks wil be added //! -//! # async fn test() { -//! let router = Router::new(); -//! let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default()) -//! .await -//! .expect("failed to create WebhookServer"); -//! # } -//! ``` +//! This library is fully compatible with the [`tracing`] crate and emits debug level tracing data. //! -//! For some usages, complete end-to-end [`WebhookServer`] implementations -//! exist. One such implementation is the [`ConversionWebhookServer`][1]. -//! -//! This library additionally also exposes lower-level structs and functions to -//! enable complete control over these details if needed. -//! -//! [1]: crate::servers::ConversionWebhookServer +//! For usage please look at the [`WebhookServer`] docs as well as the specific [`Webhook`] you are +//! using. use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use ::x509_cert::Certificate; use axum::{Router, routing::get}; -use futures_util::{FutureExt as _, pin_mut, select}; +use futures_util::{FutureExt as _, TryFutureExt, select}; +use k8s_openapi::ByteString; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; use tokio::{ signal::unix::{SignalKind, signal}, sync::mpsc, + try_join, }; use tower::ServiceBuilder; +use webhooks::{Webhook, WebhookError}; +use x509_cert::der::{EncodePem, pem::LineEnding}; -// Selected re-exports -pub use crate::options::WebhookOptions; use crate::tls::TlsServer; -pub mod maintainer; -pub mod options; -pub mod servers; pub mod tls; - -/// A generic webhook handler receiving a request and sending back a response. -/// -/// This trait is not intended to be implemented by external crates and this -/// library provides various ready-to-use implementations for it. One such an -/// implementation is part of the [`ConversionWebhookServer`][1]. -/// -/// [1]: crate::servers::ConversionWebhookServer -pub trait WebhookHandler { - fn call(self, req: Req) -> Res; -} +pub mod webhooks; /// A result type alias with the [`WebhookError`] type as the default error type. -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[derive(Debug, Snafu)] -pub enum WebhookError { +pub enum WebhookServerError { #[snafu(display("failed to create TLS server"))] CreateTlsServer { source: tls::TlsServerError }, #[snafu(display("failed to run TLS server"))] RunTlsServer { source: tls::TlsServerError }, + + #[snafu(display("failed to update certificate"))] + UpdateCertificate { source: WebhookError }, + + #[snafu(display("failed to encode CA certificate as PEM format"))] + EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, } -/// A ready-to-use webhook server. +/// An HTTPS server that serves one or more webhooks. +/// +/// It also handles TLS certificate rotation. /// -/// This server abstracts away lower-level details like TLS termination -/// and other various configurations, validations or middlewares. The routes -/// and their handlers are completely customizable by bringing your own -/// Axum [`Router`]. +/// ### Example usage /// -/// For complete end-to-end implementations, see [`ConversionWebhookServer`][1]. +/// ``` +/// use stackable_webhook::WebhookServer; +/// use stackable_webhook::WebhookServerOptions; +/// use stackable_webhook::webhooks::Webhook; /// -/// [1]: crate::servers::ConversionWebhookServer +/// # async fn docs() { +/// let mut webhooks: Vec> = vec![]; +/// +/// let webhook_options = WebhookServerOptions { +/// socket_addr: WebhookServer::DEFAULT_SOCKET_ADDRESS, +/// webhook_namespace: "my-namespace".to_owned(), +/// webhook_service_name: "my-operator".to_owned(), +/// }; +/// let webhook_server = WebhookServer::new(webhook_options, webhooks).await.unwrap(); +/// # } +/// ``` pub struct WebhookServer { + options: WebhookServerOptions, + webhooks: Vec>, tls_server: TlsServer, + cert_rx: mpsc::Receiver, +} + +#[derive(Clone, Debug)] +pub struct WebhookServerOptions { + /// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] binds to. + pub socket_addr: SocketAddr, + + /// The namespace the webhook is running in. + pub webhook_namespace: String, + + /// The name of the Kubernetes service which points to the webhook. + pub webhook_service_name: String, } impl WebhookServer { - /// The default HTTPS port `8443` + /// The default HTTPS port pub const DEFAULT_HTTPS_PORT: u16 = 8443; /// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, /// which represents binding on all network addresses. @@ -99,52 +106,13 @@ impl WebhookServer { pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); - /// Creates a new ready-to-use webhook server. + /// Creates a new webhook server with the given config and list of [`Webhook`]s. /// - /// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles - /// routing based on the provided Axum `router`. Most of the time it is sufficient to use - /// [`WebhookOptions::default()`]. See the documentation for [`WebhookOptions`] for more details - /// on the default values. - /// - /// To start the server, use the [`WebhookServer::run()`] function. This will - /// run the server using the Tokio runtime until it is terminated. - /// - /// ### Basic Example - /// - /// ``` - /// use stackable_webhook::{WebhookServer, WebhookOptions}; - /// use axum::Router; - /// - /// # async fn test() { - /// let router = Router::new(); - /// let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default()) - /// .await - /// .expect("failed to create WebhookServer"); - /// # } - /// ``` - /// - /// ### Example with Custom Options - /// - /// ``` - /// use stackable_webhook::{WebhookServer, WebhookOptions}; - /// use axum::Router; - /// - /// # async fn test() { - /// let options = WebhookOptions::builder() - /// .bind_address([127, 0, 0, 1], 8080) - /// .add_subject_alterative_dns_name("my-san-entry") - /// .build(); - /// - /// let router = Router::new(); - /// let (server, cert_rx) = WebhookServer::new(router, options) - /// .await - /// .expect("failed to create WebhookServer"); - /// # } - /// ``` + /// Please read their documentation for details. pub async fn new( - router: Router, - options: WebhookOptions, - ) -> Result<(Self, mpsc::Receiver)> { + options: WebhookServerOptions, + webhooks: Vec>, + ) -> Result { tracing::trace!("create new webhook server"); // TODO (@Techassi): Make opt-in configurable from the outside @@ -156,22 +124,33 @@ impl WebhookServer { // by the Axum project. // // See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware - // TODO (@NickLarsenNZ): rename this server_builder and keep it specific to tracing, since it's placement in the chain is important - let service_builder = ServiceBuilder::new().layer(trace_layer); + let trace_service_builder = ServiceBuilder::new().layer(trace_layer); // Create the root router and merge the provided router into it. tracing::debug!("create core router and merge provided router"); + let mut router = Router::new(); + for webhook in &webhooks { + router = webhook.register_routes(router); + } + let router = router - .layer(service_builder) + // Enrich spans for routes added above. + // Routes defined below it will not be instrumented to reduce noise. + .layer(trace_service_builder) // The health route is below the AxumTraceLayer so as not to be instrumented .route("/health", get(|| async { "ok" })); tracing::debug!("create TLS server"); - let (tls_server, cert_rx) = TlsServer::new(router, options) + let (tls_server, cert_rx) = TlsServer::new(router, &options) .await .context(CreateTlsServerSnafu)?; - Ok((Self { tls_server }, cert_rx)) + Ok(Self { + options, + webhooks, + tls_server, + cert_rx, + }) } /// Runs the Webhook server and sets up signal handlers for shutting down. @@ -200,19 +179,58 @@ impl WebhookServer { }; // select requires Future + Unpin - pin_mut!(future_server); - pin_mut!(future_signal); - - futures_util::future::select(future_server, future_signal).await; + tokio::pin!(future_server); + tokio::pin!(future_signal); + + tokio::select! { + res = &mut future_server => { + // If the server future errors, propagate the error + res?; + } + _ = &mut future_signal => { + tracing::info!("shutdown signal received, stopping webhook server"); + } + } Ok(()) } - /// Runs the webhook server by creating a TCP listener and binding it to - /// the specified socket address. async fn run_server(self) -> Result<()> { tracing::debug!("run webhook server"); - self.tls_server.run().await.context(RunTlsServerSnafu) + let Self { + options, + mut webhooks, + tls_server, + mut cert_rx, + } = self; + let tls_server = tls_server + .run() + .map_err(|err| WebhookServerError::RunTlsServer { source: err }); + + let cert_update_loop = async { + loop { + while let Some(cert) = cert_rx.recv().await { + // The caBundle needs to be provided as a base64-encoded PEM envelope. + let ca_bundle = cert + .to_pem(LineEnding::LF) + .context(EncodeCertificateAuthorityAsPemSnafu)?; + let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec()); + + for webhook in webhooks.iter_mut() { + webhook + .handle_certificate_rotation(&cert, &ca_bundle, &options) + .await + .context(UpdateCertificateSnafu)?; + } + } + } + + // We need to hint the return type to the compiler + #[allow(unreachable_code)] + Ok(()) + }; + + try_join!(cert_update_loop, tls_server).map(|_| ()) } } diff --git a/crates/stackable-webhook/src/maintainer.rs b/crates/stackable-webhook/src/maintainer.rs deleted file mode 100644 index b94da27aa..000000000 --- a/crates/stackable-webhook/src/maintainer.rs +++ /dev/null @@ -1,274 +0,0 @@ -use k8s_openapi::{ - ByteString, - apiextensions_apiserver::pkg::apis::apiextensions::v1::{ - CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, - WebhookConversion, - }, -}; -use kube::{ - Api, Client, ResourceExt, - api::{Patch, PatchParams}, -}; -use snafu::{ResultExt, Snafu, ensure}; -use tokio::sync::{mpsc, oneshot}; -use x509_cert::{ - Certificate, - der::{EncodePem, pem::LineEnding}, -}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("failed to encode CA certificate as PEM format"))] - EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, - - #[snafu(display("failed to send initial CRD reconcile heartbeat"))] - SendInitialReconcileHeartbeat, - - #[snafu(display("failed to patch CRD {crd_name:?}"))] - PatchCrd { - source: kube::Error, - crd_name: String, - }, -} - -/// Maintains various custom resource definitions. -/// -/// When running this, the following operations are done: -/// -/// - Apply the CRDs when starting up -/// - Reconcile the CRDs when the conversion webhook certificate is rotated -pub struct CustomResourceDefinitionMaintainer<'a> { - client: Client, - certificate_rx: mpsc::Receiver, - - definitions: Vec, - options: CustomResourceDefinitionMaintainerOptions<'a>, - - initial_reconcile_tx: oneshot::Sender<()>, -} - -impl<'a> CustomResourceDefinitionMaintainer<'a> { - /// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more - /// custom resource definitions. - /// - /// ## Parameters - /// - /// This function expects four parameters: - /// - /// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches - /// the CRDs when the TLS certificate is rotated. - /// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The - /// certificate data sent through the channel is used to set the caBundle in the conversion - /// section of the CRD. - /// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained - /// by this maintainer. If the iterator is empty, the maintainer returns early without doing - /// any work. As such, a polling mechanism which waits for all futures should be used to - /// prevent premature termination of the operator. - /// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various - /// parts of the maintainer. In the future, this will be converted to a builder, to enable a - /// cleaner API interface. - /// - /// ## Return Values - /// - /// This function returns a 2-tuple (pair) of values: - /// - /// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer. - /// See [`CustomResourceDefinitionMaintainer::run`] for more details. - /// - The [`oneshot::Receiver`] which will be used to send out a message once the initial - /// CRD reconciliation ran. This signal can be used to trigger the deployment of custom - /// resources defined by the maintained CRDs. - /// - /// ## Example - /// - /// ```no_run - /// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion}; - /// # use tokio::sync::mpsc::channel; - /// # use x509_cert::Certificate; - /// # use kube::Client; - /// use stackable_webhook::maintainer::{ - /// CustomResourceDefinitionMaintainerOptions, - /// CustomResourceDefinitionMaintainer, - /// }; - /// - /// # #[tokio::main] - /// # async fn main() { - /// # let (certificate_tx, certificate_rx) = channel(1); - /// let options = CustomResourceDefinitionMaintainerOptions { - /// operator_service_name: "my-service-name", - /// operator_namespace: "my-namespace", - /// field_manager: "my-field-manager", - /// webhook_https_port: 8443, - /// disabled: true, - /// }; - /// - /// let client = Client::try_default().await.unwrap(); - /// - /// let definitions = vec![ - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(), - /// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(), - /// ]; - /// - /// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( - /// client, - /// certificate_rx, - /// definitions, - /// options, - /// ); - /// # } - /// ``` - pub fn new( - client: Client, - certificate_rx: mpsc::Receiver, - definitions: impl IntoIterator, - options: CustomResourceDefinitionMaintainerOptions<'a>, - ) -> (Self, oneshot::Receiver<()>) { - let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); - - let maintainer = Self { - definitions: definitions.into_iter().collect(), - initial_reconcile_tx, - certificate_rx, - options, - client, - }; - - (maintainer, initial_reconcile_rx) - } - - /// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously. - /// - /// This needs to be polled in parallel with other parts of an operator, like controllers or - /// webhook servers. If it is disabled, the returned future immediately resolves to - /// [`std::task::Poll::Ready`] and thus doesn't consume any resources. - pub async fn run(mut self) -> Result<(), Error> { - let CustomResourceDefinitionMaintainerOptions { - operator_service_name, - operator_namespace, - webhook_https_port, - field_manager, - disabled, - } = self.options; - - // If the maintainer is disabled or there are no custom resource definitions, immediately - // return without doing any work. - if disabled || self.definitions.is_empty() { - return Ok(()); - } - - // This channel can only be used exactly once. The sender's send method consumes self, and - // as such, the sender is wrapped in an Option to be able to call take to consume the inner - // value. - let mut initial_reconcile_tx = Some(self.initial_reconcile_tx); - - // This get's polled by the async runtime on a regular basis (or when woken up). Once we - // receive a message containing the newly generated TLS certificate for the conversion - // webhook, we need to update the caBundle in the CRD. - while let Some(certificate) = self.certificate_rx.recv().await { - tracing::info!( - k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::>(), - "reconciling custom resource definitions" - ); - - // The caBundle needs to be provided as a base64-encoded PEM envelope. - let ca_bundle = certificate - .to_pem(LineEnding::LF) - .context(EncodeCertificateAuthorityAsPemSnafu)?; - - let crd_api: Api = Api::all(self.client.clone()); - - for crd in self.definitions.iter_mut() { - let crd_kind = &crd.spec.names.kind; - let crd_name = crd.name_any(); - - tracing::debug!( - k8s.crd.kind = crd_kind, - k8s.crd.name = crd_name, - "reconciling custom resource definition" - ); - - crd.spec.conversion = Some(CustomResourceConversion { - strategy: "Webhook".to_owned(), - webhook: Some(WebhookConversion { - // conversionReviewVersions indicates what ConversionReview versions are - // supported by the webhook. The first version in the list understood by the - // API server is sent to the webhook. The webhook must respond with a - // ConversionReview object in the same version it received. We only support - // the stable v1 ConversionReview to keep the implementation as simple as - // possible. - conversion_review_versions: vec!["v1".to_owned()], - client_config: Some(WebhookClientConfig { - service: Some(ServiceReference { - name: operator_service_name.to_owned(), - namespace: operator_namespace.to_owned(), - path: Some(format!("/convert/{crd_name}")), - port: Some(webhook_https_port.into()), - }), - // Here, ByteString takes care of encoding the provided content as - // base64. - ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), - url: None, - }), - }), - }); - - // Deploy the updated CRDs using a server-side apply. - let patch = Patch::Apply(&crd); - - // We force apply here, because we want to become the sole manager of the CRD. This - // avoids any conflicts from previous deployments via helm or stackablectl which are - // reported with the following error message: - // - // Apply failed with 2 conflicts: conflicts with "stackablectl" using apiextensions.k8s.io/v1: - // - .spec.versions - // - .spec.conversion.strategy: Conflict - // - // The official Kubernetes documentation provides three options on how to solve - // these conflicts. Option 1 is used, which is described as follows: - // - // Overwrite value, become sole manager: If overwriting the value was intentional - // (or if the applier is an automated process like a controller) the applier should - // set the force query parameter to true [...], and make the request again. This - // forces the operation to succeed, changes the value of the field, and removes the - // field from all other managers' entries in managedFields. - // - // See https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts - let patch_params = PatchParams::apply(field_manager).force(); - - crd_api - .patch(&crd_name, &patch_params, &patch) - .await - .with_context(|_| PatchCrdSnafu { crd_name })?; - } - - // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out - // via the oneshot channel. - if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() { - ensure!( - initial_reconcile_tx.send(()).is_ok(), - SendInitialReconcileHeartbeatSnafu - ); - } - } - - Ok(()) - } -} - -// TODO (@Techassi): Make this a builder instead -/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`]. -pub struct CustomResourceDefinitionMaintainerOptions<'a> { - /// The service name used by the operator/conversion webhook. - pub operator_service_name: &'a str, - - /// The namespace the operator/conversion webhook runs in. - pub operator_namespace: &'a str, - - /// The field manager used when maintaining the CRDs. - pub field_manager: &'a str, - - /// The HTTPS port the conversion webhook listens on. - pub webhook_https_port: u16, - - /// Indicates if the maintainer should be disabled. - pub disabled: bool, -} diff --git a/crates/stackable-webhook/src/options.rs b/crates/stackable-webhook/src/options.rs deleted file mode 100644 index b7eeaffea..000000000 --- a/crates/stackable-webhook/src/options.rs +++ /dev/null @@ -1,149 +0,0 @@ -//! Contains available options to configure the [WebhookServer]. - -use std::{ - net::{IpAddr, SocketAddr}, - path::PathBuf, -}; - -use stackable_certs::PrivateKeyType; - -use crate::WebhookServer; - -/// Specifies available webhook server options. -/// -/// The [`Default`] implementation for this struct contains the following values: -/// -/// - The socket binds to 127.0.0.1 on port 8443 (HTTPS) -/// - An empty list of SANs is provided to the certificate the TLS server uses. -/// -/// ### Example with Custom HTTPS IP Address and Port -/// -/// ``` -/// use stackable_webhook::WebhookOptions; -/// -/// // Set IP address and port at the same time -/// let options = WebhookOptions::builder() -/// .bind_address([0, 0, 0, 0], 12345) -/// .build(); -/// -/// // Set IP address only -/// let options = WebhookOptions::builder() -/// .bind_ip([0, 0, 0, 0]) -/// .build(); -/// -/// // Set port only -/// let options = WebhookOptions::builder() -/// .bind_port(12345) -/// .build(); -/// ``` -#[derive(Debug)] -pub struct WebhookOptions { - /// The default HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] - /// binds to. - pub socket_addr: SocketAddr, - - /// The subject alterative DNS names that should be added to the certificates generated for this - /// webhook. - pub subject_alterative_dns_names: Vec, -} - -impl Default for WebhookOptions { - fn default() -> Self { - Self::builder().build() - } -} - -impl WebhookOptions { - /// Returns the default [`WebhookOptionsBuilder`] which allows to selectively - /// customize the options. See the documentation for [`WebhookOptions`] for more - /// information on available functions. - pub fn builder() -> WebhookOptionsBuilder { - WebhookOptionsBuilder::default() - } -} - -/// The [`WebhookOptionsBuilder`] which allows to selectively customize the webhook -/// server [`WebhookOptions`]. -/// -/// Usually, this struct is not constructed manually, but instead by calling -/// [`WebhookOptions::builder()`] or [`WebhookOptionsBuilder::default()`]. -#[derive(Debug, Default)] -pub struct WebhookOptionsBuilder { - socket_addr: Option, - subject_alterative_dns_names: Vec, -} - -impl WebhookOptionsBuilder { - /// Sets the socket address the webhook server uses to bind for HTTPS. - pub fn bind_address(mut self, bind_ip: impl Into, bind_port: u16) -> Self { - self.socket_addr = Some(SocketAddr::new(bind_ip.into(), bind_port)); - self - } - - /// Sets the IP address of the socket address the webhook server uses to - /// bind for HTTPS. - pub fn bind_ip(mut self, bind_ip: impl Into) -> Self { - let addr = self - .socket_addr - .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); - addr.set_ip(bind_ip.into()); - self - } - - /// Sets the port of the socket address the webhook server uses to bind - /// for HTTPS. - pub fn bind_port(mut self, bind_port: u16) -> Self { - let addr = self - .socket_addr - .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); - addr.set_port(bind_port); - self - } - - /// Sets the subject alterative DNS names that should be added to the certificates generated for - /// this webhook. - pub fn subject_alterative_dns_names( - mut self, - subject_alterative_dns_name: Vec, - ) -> Self { - self.subject_alterative_dns_names = subject_alterative_dns_name; - self - } - - /// Adds the subject alterative DNS name to the list of names. - pub fn add_subject_alterative_dns_name( - mut self, - subject_alterative_dns_name: impl Into, - ) -> Self { - self.subject_alterative_dns_names - .push(subject_alterative_dns_name.into()); - self - } - - /// Builds the final [`WebhookOptions`] by using default values for any not - /// explicitly set option. - pub fn build(self) -> WebhookOptions { - WebhookOptions { - socket_addr: self - .socket_addr - .unwrap_or(WebhookServer::DEFAULT_SOCKET_ADDRESS), - subject_alterative_dns_names: self.subject_alterative_dns_names, - } - } -} - -#[derive(Debug)] -pub enum TlsOption { - AutoGenerate, - Mount { - private_key_type: PrivateKeyType, - private_key_path: PathBuf, - certificate_path: PathBuf, - }, -} - -impl Default for TlsOption { - fn default() -> Self { - Self::AutoGenerate - } -} diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs deleted file mode 100644 index f15b4b0a3..000000000 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ /dev/null @@ -1,322 +0,0 @@ -use std::{fmt::Debug, net::SocketAddr}; - -use axum::{Json, Router, routing::post}; -use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; -// Re-export this type because users of the conversion webhook server require -// this type to write the handler function. Instead of importing this type from -// kube directly, consumers can use this type instead. This also eliminates -// keeping the kube dependency version in sync between here and the operator. -pub use kube::core::conversion::ConversionReview; -use kube::{Client, ResourceExt}; -use snafu::{ResultExt, Snafu}; -use tokio::sync::{mpsc, oneshot}; -use tracing::instrument; -use x509_cert::Certificate; - -use crate::{ - WebhookError, WebhookHandler, WebhookServer, - maintainer::{CustomResourceDefinitionMaintainer, CustomResourceDefinitionMaintainerOptions}, - options::WebhookOptions, -}; - -#[derive(Debug, Snafu)] -pub enum ConversionWebhookError { - #[snafu(display("failed to create webhook server"))] - CreateWebhookServer { source: WebhookError }, - - #[snafu(display("failed to run webhook server"))] - RunWebhookServer { source: WebhookError }, - - #[snafu(display("failed to receive certificate from channel"))] - ReceiveCertificateFromChannel, - - #[snafu(display("failed to convert CA certificate into PEM format"))] - ConvertCaToPem { source: x509_cert::der::Error }, - - #[snafu(display("failed to reconcile CRDs"))] - ReconcileCrds { - #[snafu(source(from(ConversionWebhookError, Box::new)))] - source: Box, - }, - - #[snafu(display("failed to update CRD {crd_name:?}"))] - UpdateCrd { - source: kube::Error, - crd_name: String, - }, -} - -impl WebhookHandler for F -where - F: FnOnce(ConversionReview) -> ConversionReview, -{ - fn call(self, req: ConversionReview) -> ConversionReview { - self(req) - } -} - -// TODO: Add a builder, maybe with `bon`. -#[derive(Debug)] -pub struct ConversionWebhookOptions<'a> { - /// The bind address to bind the HTTPS server to. - pub socket_addr: SocketAddr, - - /// The namespace the operator/webhook is running in. - pub namespace: &'a str, - - /// The name of the Kubernetes service which points to the operator/webhook. - pub service_name: &'a str, -} - -/// A ready-to-use CRD conversion webhook server. -/// -/// See [`ConversionWebhookServer::new()`] for usage examples. -pub struct ConversionWebhookServer(WebhookServer); - -impl ConversionWebhookServer { - /// The default socket address the conversion webhook server binds to, see - /// [`WebhookServer::DEFAULT_SOCKET_ADDRESS`]. - pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = WebhookServer::DEFAULT_SOCKET_ADDRESS; - - /// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being - /// made to the `/convert/{CRD_NAME}` endpoint. - /// - /// The TLS certificate is automatically generated and rotated. - /// - /// ## Parameters - /// - /// This function expects the following parameters: - /// - /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`] - /// to a handler function. In most cases, the generated `CustomResource::try_merge` function - /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview` - /// signature. - /// - `options`: Provides [`ConversionWebhookOptions`] to customize various parts of the - /// webhook server, eg. the socket address used to listen on. - /// - /// ## Return Values - /// - /// This function returns a [`Result`] which contains a 2-tuple (pair) of values for the [`Ok`] - /// variant: - /// - /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See - /// [`ConversionWebhookServer::run`] for more details. - /// - The [`mpsc::Receiver`] which will be used to send out messages containing the newly - /// generated TLS certificate. This channel is used by the CRD maintainer to trigger a - /// reconcile of the CRDs it maintains. - /// - /// ## Example - /// - /// ```no_run - /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider}; - /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions}; - /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion}; - /// - /// # #[tokio::main] - /// # async fn main() { - /// # CryptoProvider::install_default(default_provider()).unwrap(); - /// let crds_and_handlers = vec![ - /// ( - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) - /// .expect("the S3Connection CRD must be merged"), - /// S3Connection::try_convert, - /// ) - /// ]; - /// - /// let options = ConversionWebhookOptions { - /// socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS, - /// namespace: "stackable-operators", - /// service_name: "product-operator", - /// }; - /// - /// let (conversion_webhook_server, _certificate_rx) = - /// ConversionWebhookServer::new(crds_and_handlers, options) - /// .await - /// .unwrap(); - /// - /// conversion_webhook_server.run().await.unwrap(); - /// # } - /// ``` - #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))] - pub async fn new( - crds_and_handlers: impl IntoIterator, - options: ConversionWebhookOptions<'_>, - ) -> Result<(Self, mpsc::Receiver), ConversionWebhookError> - where - H: WebhookHandler + Clone + Send + Sync + 'static, - { - tracing::debug!("create new conversion webhook server"); - - let mut router = Router::new(); - - for (crd, handler) in crds_and_handlers { - let crd_name = crd.name_any(); - let handler_fn = |Json(review): Json| async { - let review = handler.call(review); - Json(review) - }; - - // TODO (@Techassi): Make this part of the trait mentioned above - let route = format!("/convert/{crd_name}"); - router = router.route(&route, post(handler_fn)); - } - - let ConversionWebhookOptions { - socket_addr, - namespace: operator_namespace, - service_name: operator_service_name, - } = &options; - - // This is how Kubernetes calls us, so it decides about the naming. - // AFAIK we can not influence this, so this is the only SAN entry needed. - // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes) - // HTTP clients can use the FQDN of the service for testing or user use-cases. - let subject_alterative_dns_name = - format!("{operator_service_name}.{operator_namespace}.svc",); - - let webhook_options = WebhookOptions { - subject_alterative_dns_names: vec![subject_alterative_dns_name], - socket_addr: *socket_addr, - }; - - let (server, certificate_rx) = WebhookServer::new(router, webhook_options) - .await - .context(CreateWebhookServerSnafu)?; - - Ok((Self(server), certificate_rx)) - } - - /// Creates and returns a tuple consisting of a [`ConversionWebhookServer`], a [`CustomResourceDefinitionMaintainer`], - /// and a [`oneshot::Receiver`]. - /// - /// ## Parameters - /// - /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`] - /// to a handler function. In most cases, the generated `CustomResource::try_merge` function - /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview` - /// signature. - /// - `operator_service_name`: The name of the Kubernetes service name which points to the - /// operator/conversion webhook. This is used to construct the service reference in the CRD - /// `spec.conversion` field. - /// - `operator_namespace`: The namespace the operator runs in. This is used to construct the - /// service reference in the CRD `spec.conversion` field. - /// - `disable_maintainer`: A boolean value to indicate if the [`CustomResourceDefinitionMaintainer`] - /// should be disabled. - /// - `client`: A [`kube::Client`] used to maintain the custom resource definitions. - /// - /// See the referenced items for more details on usage. - /// - /// ## Return Values - /// - /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See - /// [`ConversionWebhookServer::run`] for more details. - /// - The [`CustomResourceDefinitionMaintainer`] which is used to run the maintainer. See - /// [`CustomResourceDefinitionMaintainer::run`] for more details. - /// - A [`oneshot::Receiver`] which is triggered after the initial reconciliation of the CRDs - /// succeeded. This signal can be used to deploy any custom resources defined by these CRDs. - /// - /// ## Example - /// - /// ```no_run - /// # use futures_util::TryFutureExt; - /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider}; - /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions}; - /// use stackable_operator::{kube::Client, crd::s3::{S3Connection, S3ConnectionVersion}}; - /// - /// # #[tokio::main] - /// # async fn main() { - /// # CryptoProvider::install_default(default_provider()).unwrap(); - /// let client = Client::try_default().await.unwrap(); - /// - /// let crds_and_handlers = vec![ - /// ( - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) - /// .expect("the S3Connection CRD must be merged"), - /// S3Connection::try_convert, - /// ) - /// ]; - /// - /// let (conversion_webhook_server, crd_maintainer, _initial_reconcile_rx) = - /// ConversionWebhookServer::with_maintainer( - /// crds_and_handlers, - /// "my-operator", - /// "my-namespace", - /// "my-field-manager", - /// false, - /// client, - /// ) - /// .await - /// .unwrap(); - /// - /// let conversion_webhook_server = conversion_webhook_server - /// .run() - /// .map_err(|err| err.to_string()); - /// - /// let crd_maintainer = crd_maintainer - /// .run() - /// .map_err(|err| err.to_string()); - /// - /// // Run both the conversion webhook server and crd_maintainer concurrently, eg. with - /// // futures::try_join!. - /// futures_util::try_join!(conversion_webhook_server, crd_maintainer).unwrap(); - /// # } - /// ``` - pub async fn with_maintainer<'a, H>( - // TODO (@Techassi): Use a trait type here which can be used to build all part of the - // conversion webhook server and a CRD maintainer. - crds_and_handlers: impl IntoIterator + Clone, - operator_service_name: &'a str, - operator_namespace: &'a str, - field_manager: &'a str, - disable_maintainer: bool, - client: Client, - ) -> Result< - ( - Self, - CustomResourceDefinitionMaintainer<'a>, - oneshot::Receiver<()>, - ), - ConversionWebhookError, - > - where - H: WebhookHandler + Clone + Send + Sync + 'static, - { - let socket_addr = ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS; - - // TODO (@Techassi): These should be moved into a builder - let webhook_options = ConversionWebhookOptions { - service_name: operator_service_name, - namespace: operator_namespace, - socket_addr, - }; - - let (conversion_webhook_server, certificate_rx) = - Self::new(crds_and_handlers.clone(), webhook_options).await?; - - let definitions = crds_and_handlers.into_iter().map(|(crd, _)| crd); - - // TODO (@Techassi): These should be moved into a builder - let maintainer_options = CustomResourceDefinitionMaintainerOptions { - webhook_https_port: socket_addr.port(), - disabled: disable_maintainer, - operator_service_name, - operator_namespace, - field_manager, - }; - - let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( - client, - certificate_rx, - definitions, - maintainer_options, - ); - - Ok((conversion_webhook_server, maintainer, initial_reconcile_rx)) - } - - /// Runs the [`ConversionWebhookServer`] asynchronously. - pub async fn run(self) -> Result<(), ConversionWebhookError> { - tracing::info!("run conversion webhook server"); - self.0.run().await.context(RunWebhookServerSnafu) - } -} diff --git a/crates/stackable-webhook/src/servers/mod.rs b/crates/stackable-webhook/src/servers/mod.rs deleted file mode 100644 index 6fbadc12d..000000000 --- a/crates/stackable-webhook/src/servers/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Contains high-level ready-to-use webhook server implementations for specific -//! purposes. -mod conversion; - -pub use conversion::{ConversionWebhookError, ConversionWebhookOptions, ConversionWebhookServer}; diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index cfd7c2e86..fa493159d 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -31,7 +31,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use x509_cert::Certificate; use crate::{ - options::WebhookOptions, + WebhookServerOptions, tls::cert_resolver::{CertificateResolver, CertificateResolverError}, }; @@ -86,15 +86,23 @@ impl TlsServer { #[instrument(name = "create_tls_server", skip(router))] pub async fn new( router: Router, - options: WebhookOptions, + options: &WebhookServerOptions, ) -> Result<(Self, mpsc::Receiver)> { let (certificate_tx, certificate_rx) = mpsc::channel(1); - let WebhookOptions { + let WebhookServerOptions { socket_addr, - subject_alterative_dns_names, + webhook_namespace, + webhook_service_name, } = options; + // This is how Kubernetes calls us, so it decides about the naming. + // AFAIK we can not influence this, so this is the only SAN entry needed. + // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes) + // HTTP clients can use the FQDN of the service for testing or user use-cases. + let subject_alterative_dns_names = + vec![format!("{webhook_service_name}.{webhook_namespace}.svc")]; + let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, certificate_tx) .await .context(CreateCertificateResolverSnafu)?; @@ -113,13 +121,17 @@ impl TlsServer { let tls_server = Self { config, cert_resolver, - socket_addr, + socket_addr: *socket_addr, router, }; Ok((tls_server, certificate_rx)) } + pub fn socket_addr(&self) -> &SocketAddr { + &self.socket_addr + } + /// Runs the TLS server by listening for incoming TCP connections on the /// bound socket address. It only accepts TLS connections. Internally each /// TLS stream get handled by a Hyper service, which in turn is an Axum diff --git a/crates/stackable-webhook/src/webhooks/conversion_webhook.rs b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs new file mode 100644 index 000000000..7712bb859 --- /dev/null +++ b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs @@ -0,0 +1,201 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use axum::{Json, Router, routing::post}; +use k8s_openapi::{ + ByteString, + apiextensions_apiserver::pkg::apis::apiextensions::v1::{ + CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, + WebhookConversion, + }, +}; +// Re-export this type because users of the conversion webhook server require +// this type to write the handler function. Instead of importing this type from +// kube directly, consumers can use this type instead. This also eliminates +// keeping the kube dependency version in sync between here and the operator. +pub use kube::core::conversion::ConversionReview; +use kube::{ + Api, Client, ResourceExt, + api::{Patch, PatchParams}, +}; +use snafu::{ResultExt, Snafu, ensure}; +use tokio::sync::oneshot; +use tracing::instrument; +use x509_cert::Certificate; + +use super::{Webhook, WebhookError}; +use crate::WebhookServerOptions; + +#[derive(Debug, Snafu)] +pub enum ConversionWebhookError { + #[snafu(display("failed to send initial CRD reconcile heartbeat"))] + SendInitialReconcileHeartbeat, + + #[snafu(display("failed to patch CRD {crd_name:?}"))] + PatchCrd { + source: kube::Error, + crd_name: String, + }, +} + +pub struct ConversionWebhook { + crds_and_handlers: Vec<(CustomResourceDefinition, H)>, + disable_crd_maintenance: bool, + client: Client, + /// The field manager used when maintaining the CRDs. + field_manager: String, + // This channel can only be used exactly once. The sender's send method consumes self, and + // as such, the sender is wrapped in an Option to be able to call take to consume the inner + // value. + initial_reconcile_tx: Option>, +} + +impl ConversionWebhook { + pub fn new( + crds_and_handlers: impl IntoIterator, + disable_crd_maintenance: bool, + client: Client, + field_manager: String, + ) -> (Self, oneshot::Receiver<()>) { + let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); + + ( + Self { + crds_and_handlers: crds_and_handlers.into_iter().collect(), + disable_crd_maintenance, + client, + field_manager, + initial_reconcile_tx: Some(initial_reconcile_tx), + }, + initial_reconcile_rx, + ) + } + + #[instrument( + skip(self, crd, crd_api), + fields( + name = crd.name_any(), + kind = &crd.spec.names.kind + ) + )] + async fn reconcile_crd( + &self, + mut crd: CustomResourceDefinition, + crd_api: &Api, + new_ca_bundle: &ByteString, + options: &WebhookServerOptions, + ) -> Result<(), WebhookError> { + let crd_kind = &crd.spec.names.kind; + let crd_name = crd.name_any(); + + tracing::info!( + k8s.crd.kind = crd_kind, + k8s.crd.name = crd_name, + "reconciling custom resource definition" + ); + + crd.spec.conversion = Some(CustomResourceConversion { + strategy: "Webhook".to_owned(), + webhook: Some(WebhookConversion { + // conversionReviewVersions indicates what ConversionReview versions are + // supported by the webhook. The first version in the list understood by the + // API server is sent to the webhook. The webhook must respond with a + // ConversionReview object in the same version it received. We only support + // the stable v1 ConversionReview to keep the implementation as simple as + // possible. + conversion_review_versions: vec!["v1".to_owned()], + client_config: Some(WebhookClientConfig { + service: Some(ServiceReference { + name: options.webhook_service_name.to_owned(), + namespace: options.webhook_namespace.to_owned(), + path: Some(format!("/convert/{crd_name}")), + port: Some(options.socket_addr.port().into()), + }), + // Here, ByteString takes care of encoding the provided content as base64. + ca_bundle: Some(new_ca_bundle.to_owned()), + url: None, + }), + }), + }); + + // Deploy the updated CRDs using a server-side apply. + let patch = Patch::Apply(&crd); + + // We force apply here, because we want to become the sole manager of the CRD. This + // avoids any conflicts from previous deployments via helm or stackablectl which are + // reported with the following error message: + // + // Apply failed with 2 conflicts: conflicts with "stackablectl" using apiextensions.k8s.io/v1: + // - .spec.versions + // - .spec.conversion.strategy: Conflict + // + // The official Kubernetes documentation provides three options on how to solve + // these conflicts. Option 1 is used, which is described as follows: + // + // Overwrite value, become sole manager: If overwriting the value was intentional + // (or if the applier is an automated process like a controller) the applier should + // set the force query parameter to true [...], and make the request again. This + // forces the operation to succeed, changes the value of the field, and removes the + // field from all other managers' entries in managedFields. + // + // See https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts + let patch_params = PatchParams::apply(&self.field_manager).force(); + + crd_api + .patch(&crd_name, &patch_params, &patch) + .await + .with_context(|_| PatchCrdSnafu { crd_name })?; + + Ok(()) + } +} + +#[async_trait] +impl Webhook for ConversionWebhook +where + H: FnOnce(ConversionReview) -> ConversionReview + Clone + Send + Sync + 'static, +{ + fn register_routes(&self, mut router: Router) -> Router { + for (crd, handler) in self.crds_and_handlers.clone() { + let crd_name = crd.name_any(); + let handler_fn = |Json(review): Json| async { + let review = handler(review); + Json(review) + }; + + let route = format!("/convert/{crd_name}"); + router = router.route(&route, post(handler_fn)); + } + + router + } + + #[instrument(skip(self))] + async fn handle_certificate_rotation( + &mut self, + _new_certificate: &Certificate, + new_ca_bundle: &ByteString, + options: &WebhookServerOptions, + ) -> Result<(), WebhookError> { + if self.disable_crd_maintenance { + return Ok(()); + } + + let crd_api: Api = Api::all(self.client.clone()); + for (crd, _) in &self.crds_and_handlers { + self.reconcile_crd(crd.clone(), &crd_api, new_ca_bundle, options) + .await?; + } + + // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out + // via the oneshot channel. + if let Some(initial_reconcile_tx) = self.initial_reconcile_tx.take() { + ensure!( + initial_reconcile_tx.send(()).is_ok(), + SendInitialReconcileHeartbeatSnafu + ); + } + + Ok(()) + } +} diff --git a/crates/stackable-webhook/src/webhooks/mod.rs b/crates/stackable-webhook/src/webhooks/mod.rs new file mode 100644 index 000000000..c354b831f --- /dev/null +++ b/crates/stackable-webhook/src/webhooks/mod.rs @@ -0,0 +1,48 @@ +use async_trait::async_trait; +use axum::Router; +pub use conversion_webhook::{ConversionReview, ConversionWebhook, ConversionWebhookError}; +use k8s_openapi::ByteString; +pub use mutating_webhook::{MutatingWebhook, MutatingWebhookError}; +use snafu::Snafu; +use x509_cert::Certificate; + +use crate::WebhookServerOptions; + +mod conversion_webhook; +mod mutating_webhook; + +#[derive(Snafu, Debug)] +pub enum WebhookError { + #[snafu(display("conversion webhook error"), context(false))] + ConversionWebhookError { + source: conversion_webhook::ConversionWebhookError, + }, + + #[snafu(display("mutating webhook error"), context(false))] + MutatingWebhookError { + source: mutating_webhook::MutatingWebhookError, + }, +} + +/// A webhook (such as a conversion or mutating webhook) needs to implement this trait. +// +// We still need to use the async-trait crate, as Rust 1.91.1 does not support dynamic dispatch +// in combination with async functions. +#[async_trait] +pub trait Webhook { + /// The webhook can add arbitrary routes to the passed [`Router`] and needs to return the + /// resulting [`Router`]. + fn register_routes(&self, router: Router) -> Router; + + /// The HTTPS server periodically rotates it's certificate. + /// + /// Typically, some caller of the webhook (e.g. Kubernetes) needs to know the certificate to be + /// able to establish the TLS connection. + /// Webhooks are informed about new certificates by this function and can react accordingly. + async fn handle_certificate_rotation( + &mut self, + new_certificate: &Certificate, + new_ca_bundle: &ByteString, + options: &WebhookServerOptions, + ) -> Result<(), WebhookError>; +} diff --git a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs new file mode 100644 index 000000000..82a883ff1 --- /dev/null +++ b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs @@ -0,0 +1,231 @@ +use std::{fmt::Debug, marker::PhantomData, sync::Arc}; + +use async_trait::async_trait; +use axum::{Json, Router, routing::post}; +use k8s_openapi::{ + ByteString, + api::admissionregistration::v1::{ + MutatingWebhookConfiguration, ServiceReference, WebhookClientConfig, + }, +}; +use kube::{ + Api, Client, Resource, ResourceExt, + api::{Patch, PatchParams}, + core::admission::{AdmissionRequest, AdmissionResponse, AdmissionReview}, +}; +use serde::{Serialize, de::DeserializeOwned}; +use snafu::{ResultExt, Snafu}; +use tracing::instrument; +use x509_cert::Certificate; + +use super::{Webhook, WebhookError}; +use crate::WebhookServerOptions; + +#[derive(Debug, Snafu)] +pub enum MutatingWebhookError { + #[snafu(display("failed to patch MutatingWebhookConfiguration {mwc_name:?}"))] + PatchMutatingWebhookConfiguration { + source: kube::Error, + mwc_name: String, + }, +} + +/// Mutating webhook, which let's you intercept object creations/modification and modify the object +/// on the fly. +/// +/// As the webhook is typed with the Resource type `R`, it can only handle a single resource +/// mutation. Use multiple [`MutatingWebhook`] if you need to mutate multiple resource kinds. +/// +/// ### Example usage +/// +/// This is only some high-level basic usage! +/// +/// For concrete usage please have a look at the restart controller mutating webhook in +/// commons-operator. +/// +/// ``` +/// use std::sync::Arc; +/// +/// use k8s_openapi::api::admissionregistration::v1::MutatingWebhookConfiguration; +/// use k8s_openapi::api::apps::v1::StatefulSet; +/// +/// use stackable_operator::builder::meta::ObjectMetaBuilder; +/// use stackable_operator::kube::Client; +/// use stackable_operator::kube::core::admission::{AdmissionRequest, AdmissionResponse}; +/// use stackable_operator::kvp::Label; +/// use stackable_webhook::WebhookServer; +/// use stackable_webhook::webhooks::MutatingWebhook; +/// +/// # async fn docs() { +/// // The Kubernetes client +/// let client = Client::try_default().await.unwrap(); +/// // The context of the controller, e.g. contains a Kubernetes client +/// let ctx = Arc::new(()); +/// // Read in from user input, e.g. CLI arguments +/// let disable_restarter_mutating_webhook = false; +/// +/// let mutating_webhook = Box::new(MutatingWebhook::new( +/// get_mutating_webhook_configuration(), +/// my_handler, +/// ctx, +/// disable_restarter_mutating_webhook, +/// client, +/// "my-field-manager".to_owned(), +/// )); +/// +/// let webhook_options = todo!(); +/// let webhook_server = WebhookServer::new(webhook_options, vec![mutating_webhook]).await.unwrap(); +/// webhook_server.run().await.unwrap(); +/// # } +/// +/// fn get_mutating_webhook_configuration() -> MutatingWebhookConfiguration { +/// let webhook_name = "pod-labeler.stackable.tech"; +/// +/// MutatingWebhookConfiguration { +/// webhooks: Some(vec![k8s_openapi::api::admissionregistration::v1::MutatingWebhook { +/// // This is checked by the stackable_webhook code +/// admission_review_versions: vec!["v1".to_owned()], +/// ..Default::default() +/// }]), +/// ..Default::default() +/// } +/// } +/// +/// // Basic no-op implementation +/// pub async fn my_handler( +/// ctx: Arc<()>, +/// request: AdmissionRequest, +/// ) -> AdmissionResponse { +/// AdmissionResponse::from(&request) +/// } +/// ``` +pub struct MutatingWebhook { + mutating_webhook_configuration: MutatingWebhookConfiguration, + handler: H, + handler_state: Arc, + _resource: PhantomData, + + disable_mutating_webhook_configuration_maintenance: bool, + client: Client, + + /// The field manager used when maintaining the CRDs. + field_manager: String, +} + +impl MutatingWebhook { + /// All webhooks need to set the admissionReviewVersions to `["v1"]`, as this mutating webhook + /// only supports that version! A failure to do so will result in a panic. + /// + /// Your [`MutatingWebhookConfiguration`] can contain 0..n webhooks, but it is recommended to + /// only have a single entry in there, as the clientConfig of all entries will be set to the + /// same service, port and HTTP path. + pub fn new( + mutating_webhook_configuration: MutatingWebhookConfiguration, + handler: H, + handler_state: Arc, + disable_mutating_webhook_configuration_maintenance: bool, + client: Client, + field_manager: String, + ) -> Self { + for webhook in mutating_webhook_configuration.webhooks.iter().flatten() { + assert_eq!( + webhook.admission_review_versions, + vec!["v1"], + "We decide how we de-serialize the JSON and with that what AdmissionReview version we support (currently only v1)" + ); + } + + Self { + mutating_webhook_configuration, + handler, + handler_state, + _resource: PhantomData, + disable_mutating_webhook_configuration_maintenance, + client, + field_manager, + } + } + + fn http_path(&self) -> String { + let mutating_webhook_configuration_name = self.mutating_webhook_configuration.name_any(); + format!("/mutate/{mutating_webhook_configuration_name}") + } +} + +#[async_trait] +impl Webhook for MutatingWebhook +where + H: Fn(Arc, AdmissionRequest) -> Fut + Clone + Send + Sync + 'static, + Fut: Future + Send + 'static, + R: Resource + Send + Sync + DeserializeOwned + Serialize + 'static, + S: Send + Sync + 'static, +{ + fn register_routes(&self, router: Router) -> Router { + let handler_state = self.handler_state.clone(); + let handler = self.handler.clone(); + let handler_fn = |Json(review): Json>| async move { + let request: AdmissionRequest = match review.try_into() { + Ok(request) => request, + Err(err) => { + return Json( + AdmissionResponse::invalid(format!("failed to convert to request: {err}")) + .into_review(), + ); + } + }; + + let response = handler(handler_state, request).await; + let review = response.into_review(); + Json(review) + }; + + let route = self.http_path(); + router.route(&route, post(handler_fn)) + } + + #[instrument(skip(self))] + async fn handle_certificate_rotation( + &mut self, + _new_certificate: &Certificate, + new_ca_bundle: &ByteString, + options: &WebhookServerOptions, + ) -> Result<(), WebhookError> { + if self.disable_mutating_webhook_configuration_maintenance { + return Ok(()); + } + + let mut mutating_webhook_configuration = self.mutating_webhook_configuration.clone(); + let mwc_name = mutating_webhook_configuration.name_any(); + tracing::info!( + k8s.mutatingwebhookconfiguration.name = mwc_name, + "reconciling mutating webhook configurations" + ); + + for webhook in mutating_webhook_configuration.webhooks.iter_mut().flatten() { + // We know how we can be called (and with what certificate), so we can always set that + webhook.client_config = WebhookClientConfig { + service: Some(ServiceReference { + name: options.webhook_service_name.to_owned(), + namespace: options.webhook_namespace.to_owned(), + path: Some(self.http_path()), + port: Some(options.socket_addr.port().into()), + }), + // Here, ByteString takes care of encoding the provided content as base64. + ca_bundle: Some(new_ca_bundle.to_owned()), + url: None, + }; + } + + let mwc_api: Api = Api::all(self.client.clone()); + // Other than with the CRDs we don't need to force-apply the MutatingWebhookConfiguration + let patch = Patch::Apply(&mutating_webhook_configuration); + let patch_params = PatchParams::apply(&self.field_manager); + + mwc_api + .patch(&mwc_name, &patch_params, &patch) + .await + .with_context(|_| PatchMutatingWebhookConfigurationSnafu { mwc_name })?; + + Ok(()) + } +}