diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 05b18d722..8534f7bd0 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -8,6 +8,12 @@ All notable changes to this project will be documented in this file. ### Added +- Add `CustomResourceDefinitionMaintainer` which applies and patches CRDs triggered by TLS + certificate rotations of the `ConversionWebhookServer`. It additionally provides a `oneshot` + channel which can for example be used to trigger creation/patching of any custom resources deployed by + the operator ([#1099]). +- Add a `Client::create_if_missing` associated function to create a resource if it doesn't + exist ([#1099]). - Add CLI argument and env var to disable the end-of-support checker: `EOS_DISABLED` (`--eos-disabled`) ([#1101]). - Add end-of-support checker ([#1096]). - The EoS checker can be constructed using `EndOfSupportChecker::new()`. @@ -28,6 +34,7 @@ All notable changes to this project will be documented in this file. [#1096]: https://github.com/stackabletech/operator-rs/pull/1096 [#1098]: https://github.com/stackabletech/operator-rs/pull/1098 +[#1099]: https://github.com/stackabletech/operator-rs/pull/1099 [#1101]: https://github.com/stackabletech/operator-rs/pull/1101 [#1103]: https://github.com/stackabletech/operator-rs/pull/1103 diff --git a/crates/stackable-operator/Cargo.toml b/crates/stackable-operator/Cargo.toml index 09beae530..aecdee49f 100644 --- a/crates/stackable-operator/Cargo.toml +++ b/crates/stackable-operator/Cargo.toml @@ -24,7 +24,7 @@ stackable-operator-derive = { path = "../stackable-operator-derive" } stackable-shared = { path = "../stackable-shared", features = ["chrono", "time"] } stackable-telemetry = { path = "../stackable-telemetry", optional = true, features = ["clap"] } stackable-versioned = { path = "../stackable-versioned", optional = true } -stackable-webhook = { path = "../stackable-webhook", optional = true } +stackable-webhook = { path = "../stackable-webhook", optional = true} chrono.workspace = true clap.workspace = true diff --git a/crates/stackable-operator/src/client.rs b/crates/stackable-operator/src/client.rs index f79a1eb91..b910eaba9 100644 --- a/crates/stackable-operator/src/client.rs +++ b/crates/stackable-operator/src/client.rs @@ -253,6 +253,25 @@ impl Client { }) } + /// Optionally creates a resource if it does not exist yet. + /// + /// The name used for lookup is extracted from the resource via [`ResourceExt::name_any()`]. + /// This function either returns the existing resource or the newly created one. + pub async fn create_if_missing(&self, resource: &T) -> Result + where + T: Clone + Debug + DeserializeOwned + Resource + Serialize + GetApi, + ::DynamicType: Default, + { + if let Some(r) = self + .get_opt(&resource.name_any(), resource.get_namespace()) + .await? + { + return Ok(r); + } + + self.create(resource).await + } + /// Patches a resource using the `MERGE` patch strategy described /// in [JSON Merge Patch](https://tools.ietf.org/html/rfc7386) /// This will fail for objects that do not exist yet. diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index cd116e86d..9d8b0e602 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- BREAKING: The `ConversionWebhookServer` now returns a pair of values ([#1099]): + - The conversion webhook server itself + - A `mpsc::Receiver` to provide consumers the newly generated TLS certificate +- BREAKING: Constants for ports, IP addresses and socket addresses are now associated constants on + `(Conversion)WebhookServer` instead of free-standing ones ([#1099]). + +### Removed + +- BREAKING: The `maintain_crds` and `field_manager` fields in `ConversionWebhookOptions` + are removed ([#1099]). + +[#1099]: https://github.com/stackabletech/operator-rs/pull/1099 + ## [0.6.0] - 2025-09-09 ### Added diff --git a/crates/stackable-webhook/src/constants.rs b/crates/stackable-webhook/src/constants.rs deleted file mode 100644 index bd5a49c27..000000000 --- a/crates/stackable-webhook/src/constants.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! Contains various constant definitions, mostly for default ports and IP -//! addresses. -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - -/// The default HTTPS port `8443` -pub const DEFAULT_HTTPS_PORT: u16 = 8443; - -// The HTTPS port the conversion webhook runs at -pub const CONVERSION_WEBHOOK_HTTPS_PORT: u16 = DEFAULT_HTTPS_PORT; - -/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, -/// which represents binding on all network addresses. -// -// TODO: We might want to switch to `Ipv6Addr::UNSPECIFIED)` here, as this *normally* binds to IPv4 -// and IPv6. However, it's complicated and depends on the underlying system... -// If we do so, we should set `set_only_v6(false)` on the socket to not rely on system defaults. -pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - -/// The default socket address `0.0.0.0:8443` the webhook server binds to. -pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = - SocketAddr::new(DEFAULT_LISTEN_ADDRESS, DEFAULT_HTTPS_PORT); diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index c98515e1e..ee33ab542 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -26,6 +26,9 @@ //! enable complete control over these details if needed. //! //! [1]: crate::servers::ConversionWebhookServer +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use ::x509_cert::Certificate; use axum::{Router, routing::get}; use futures_util::{FutureExt as _, pin_mut, select}; use snafu::{ResultExt, Snafu}; @@ -35,19 +38,16 @@ use tokio::{ sync::mpsc, }; use tower::ServiceBuilder; -use x509_cert::Certificate; -// use tower_http::trace::TraceLayer; +// Selected re-exports +pub use crate::options::WebhookOptions; use crate::tls::TlsServer; -pub mod constants; +pub mod maintainer; pub mod options; pub mod servers; pub mod tls; -// Selected re-exports -pub use crate::options::WebhookOptions; - /// A generic webhook handler receiving a request and sending back a response. /// /// This trait is not intended to be implemented by external crates and this @@ -86,6 +86,19 @@ pub struct WebhookServer { } impl WebhookServer { + /// The default HTTPS port `8443` + pub const DEFAULT_HTTPS_PORT: u16 = 8443; + /// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, + /// which represents binding on all network addresses. + // + // TODO: We might want to switch to `Ipv6Addr::UNSPECIFIED)` here, as this *normally* binds to IPv4 + // and IPv6. However, it's complicated and depends on the underlying system... + // If we do so, we should set `set_only_v6(false)` on the socket to not rely on system defaults. + pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + /// The default socket address `0.0.0.0:8443` the webhook server binds to. + pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = + SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); + /// Creates a new ready-to-use webhook server. /// /// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles diff --git a/crates/stackable-webhook/src/maintainer.rs b/crates/stackable-webhook/src/maintainer.rs new file mode 100644 index 000000000..649ac7da7 --- /dev/null +++ b/crates/stackable-webhook/src/maintainer.rs @@ -0,0 +1,249 @@ +use k8s_openapi::{ + ByteString, + apiextensions_apiserver::pkg::apis::apiextensions::v1::{ + CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, + WebhookConversion, + }, +}; +use kube::{ + Api, Client, ResourceExt, + api::{Patch, PatchParams}, +}; +use snafu::{ResultExt, Snafu, ensure}; +use tokio::sync::{mpsc, oneshot}; +use x509_cert::{ + Certificate, + der::{EncodePem, pem::LineEnding}, +}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("failed to encode CA certificate as PEM format"))] + EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, + + #[snafu(display("failed to send initial CRD reconcile heartbeat"))] + SendInitialReconcileHeartbeat, + + #[snafu(display("failed to patch CRD {crd_name:?}"))] + PatchCrd { + source: kube::Error, + crd_name: String, + }, +} + +/// Maintains various custom resource definitions. +/// +/// When running this, the following operations are done: +/// +/// - Apply the CRDs when starting up +/// - Reconcile the CRDs when the conversion webhook certificate is rotated +pub struct CustomResourceDefinitionMaintainer<'a> { + client: Client, + certificate_rx: mpsc::Receiver, + + definitions: Vec, + options: CustomResourceDefinitionMaintainerOptions<'a>, + + initial_reconcile_tx: oneshot::Sender<()>, +} + +impl<'a> CustomResourceDefinitionMaintainer<'a> { + /// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more + /// custom resource definitions. + /// + /// ## Parameters + /// + /// This function expects four parameters: + /// + /// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches + /// the CRDs when the TLS certificate is rotated. + /// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The + /// certificate data sent through the channel is used to set the caBundle in the conversion + /// section of the CRD. + /// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained + /// by this maintainer. If the iterator is empty, the maintainer returns early without doing + /// any work. As such, a polling mechanism which waits for all futures should be used to + /// prevent premature termination of the operator. + /// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various + /// parts of the maintainer. In the future, this will be converted to a builder, to enable a + /// cleaner API interface. + /// + /// ## Return Values + /// + /// This function returns a 2-tuple (pair) of values: + /// + /// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer. + /// See [`CustomResourceDefinitionMaintainer::run`] for more details. + /// - The [`oneshot::Receiver`] which will be used to send out a message once the initial + /// CRD reconciliation ran. This signal can be used to trigger the deployment of custom + /// resources defined by the maintained CRDs. + /// + /// ## Example + /// + /// ```no_run + /// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion}; + /// # use tokio::sync::mpsc::channel; + /// # use x509_cert::Certificate; + /// # use kube::Client; + /// use stackable_webhook::maintainer::{ + /// CustomResourceDefinitionMaintainerOptions, + /// CustomResourceDefinitionMaintainer, + /// }; + /// + /// # #[tokio::main] + /// # async fn main() { + /// # let (certificate_tx, certificate_rx) = channel(1); + /// let options = CustomResourceDefinitionMaintainerOptions { + /// operator_name: "my-service-name", + /// operator_namespace: "my-namespace", + /// webhook_https_port: 8443, + /// disabled: true, + /// }; + /// + /// let client = Client::try_default().await.unwrap(); + /// + /// let definitions = vec![ + /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(), + /// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(), + /// ]; + /// + /// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( + /// client, + /// certificate_rx, + /// definitions, + /// options, + /// ); + /// # } + /// ``` + pub fn new( + client: Client, + certificate_rx: mpsc::Receiver, + definitions: impl IntoIterator, + options: CustomResourceDefinitionMaintainerOptions<'a>, + ) -> (Self, oneshot::Receiver<()>) { + let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); + + let maintainer = Self { + definitions: definitions.into_iter().collect(), + initial_reconcile_tx, + certificate_rx, + options, + client, + }; + + (maintainer, initial_reconcile_rx) + } + + /// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously. + /// + /// This needs to be polled in parallel with other parts of an operator, like controllers or + /// webhook servers. If it is disabled, the returned future immediately resolves to + /// [`std::task::Poll::Ready`] and thus doesn't consume any resources. + pub async fn run(mut self) -> Result<(), Error> { + let CustomResourceDefinitionMaintainerOptions { + operator_namespace, + webhook_https_port, + operator_name, + disabled, + } = self.options; + + // If the maintainer is disabled or there are no custom resource definitions, immediately + // return without doing any work. + if disabled || self.definitions.is_empty() { + return Ok(()); + } + + // This channel can only be used exactly once. The sender's send method consumes self, and + // as such, the sender is wrapped in an Option to be able to call take to consume the inner + // value. + let mut initial_reconcile_tx = Some(self.initial_reconcile_tx); + + // This get's polled by the async runtime on a regular basis (or when woken up). Once we + // receive a message containing the newly generated TLS certificate for the conversion + // webhook, we need to update the caBundle in the CRD. + while let Some(certificate) = self.certificate_rx.recv().await { + tracing::info!( + k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::>(), + "reconciling custom resource definitions" + ); + + // The caBundle needs to be provided as a base64-encoded PEM envelope. + let ca_bundle = certificate + .to_pem(LineEnding::LF) + .context(EncodeCertificateAuthorityAsPemSnafu)?; + + let crd_api: Api = Api::all(self.client.clone()); + + for crd in self.definitions.iter_mut() { + let crd_kind = &crd.spec.names.kind; + let crd_name = crd.name_any(); + + tracing::debug!( + k8s.crd.kind = crd_kind, + k8s.crd.name = crd_name, + "reconciling custom resource definition" + ); + + crd.spec.conversion = Some(CustomResourceConversion { + strategy: "Webhook".to_owned(), + webhook: Some(WebhookConversion { + // conversionReviewVersions indicates what ConversionReview versions are + // supported by the webhook. The first version in the list understood by the + // API server is sent to the webhook. The webhook must respond with a + // ConversionReview object in the same version it received. We only support + // the stable v1 ConversionReview to keep the implementation as simple as + // possible. + conversion_review_versions: vec!["v1".to_owned()], + client_config: Some(WebhookClientConfig { + service: Some(ServiceReference { + name: operator_name.to_owned(), + namespace: operator_namespace.to_owned(), + path: Some(format!("/convert/{crd_name}")), + port: Some(webhook_https_port.into()), + }), + // Here, ByteString takes care of encoding the provided content as + // base64. + ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), + url: None, + }), + }), + }); + + // Deploy the updated CRDs using a server-side apply. + let patch = Patch::Apply(&crd); + let patch_params = PatchParams::apply(operator_name); + crd_api + .patch(&crd_name, &patch_params, &patch) + .await + .with_context(|_| PatchCrdSnafu { crd_name })?; + } + + // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out + // via the oneshot channel. + if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() { + ensure!( + initial_reconcile_tx.send(()).is_ok(), + SendInitialReconcileHeartbeatSnafu + ); + } + } + + Ok(()) + } +} + +// TODO (@Techassi): Make this a builder instead +/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`]. +pub struct CustomResourceDefinitionMaintainerOptions<'a> { + /// The service name used by the operator/conversion webhook and as a field manager. + pub operator_name: &'a str, + + /// The namespace the operator/conversion webhook runs in. + pub operator_namespace: &'a str, + + /// The HTTPS port the conversion webhook listens on. + pub webhook_https_port: u16, + + /// Indicates if the maintainer should be disabled. + pub disabled: bool, +} diff --git a/crates/stackable-webhook/src/options.rs b/crates/stackable-webhook/src/options.rs index 90623b093..b7eeaffea 100644 --- a/crates/stackable-webhook/src/options.rs +++ b/crates/stackable-webhook/src/options.rs @@ -1,4 +1,5 @@ -//! Contains available options to configure the [WebhookServer][crate::WebhookServer]. +//! Contains available options to configure the [WebhookServer]. + use std::{ net::{IpAddr, SocketAddr}, path::PathBuf, @@ -6,7 +7,7 @@ use std::{ use stackable_certs::PrivateKeyType; -use crate::constants::DEFAULT_SOCKET_ADDRESS; +use crate::WebhookServer; /// Specifies available webhook server options. /// @@ -82,7 +83,9 @@ impl WebhookOptionsBuilder { /// Sets the IP address of the socket address the webhook server uses to /// bind for HTTPS. pub fn bind_ip(mut self, bind_ip: impl Into) -> Self { - let addr = self.socket_addr.get_or_insert(DEFAULT_SOCKET_ADDRESS); + let addr = self + .socket_addr + .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); addr.set_ip(bind_ip.into()); self } @@ -90,7 +93,9 @@ impl WebhookOptionsBuilder { /// Sets the port of the socket address the webhook server uses to bind /// for HTTPS. pub fn bind_port(mut self, bind_port: u16) -> Self { - let addr = self.socket_addr.get_or_insert(DEFAULT_SOCKET_ADDRESS); + let addr = self + .socket_addr + .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); addr.set_port(bind_port); self } @@ -119,7 +124,9 @@ impl WebhookOptionsBuilder { /// explicitly set option. pub fn build(self) -> WebhookOptions { WebhookOptions { - socket_addr: self.socket_addr.unwrap_or(DEFAULT_SOCKET_ADDRESS), + socket_addr: self + .socket_addr + .unwrap_or(WebhookServer::DEFAULT_SOCKET_ADDRESS), subject_alterative_dns_names: self.subject_alterative_dns_names, } } diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs index ca83f6078..9b4beab2e 100644 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ b/crates/stackable-webhook/src/servers/conversion.rs @@ -1,32 +1,21 @@ use std::{fmt::Debug, net::SocketAddr}; use axum::{Json, Router, routing::post}; -use k8s_openapi::{ - ByteString, - apiextensions_apiserver::pkg::apis::apiextensions::v1::{ - CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, - WebhookConversion, - }, -}; +use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; // Re-export this type because users of the conversion webhook server require // this type to write the handler function. Instead of importing this type from // kube directly, consumers can use this type instead. This also eliminates // keeping the kube dependency version in sync between here and the operator. pub use kube::core::conversion::ConversionReview; -use kube::{ - Api, Client, ResourceExt, - api::{Patch, PatchParams}, -}; -use snafu::{OptionExt, ResultExt, Snafu}; -use tokio::{sync::mpsc, try_join}; +use kube::{Client, ResourceExt}; +use snafu::{ResultExt, Snafu}; +use tokio::sync::{mpsc, oneshot}; use tracing::instrument; -use x509_cert::{ - Certificate, - der::{EncodePem, pem::LineEnding}, -}; +use x509_cert::Certificate; use crate::{ - WebhookError, WebhookHandler, WebhookServer, constants::CONVERSION_WEBHOOK_HTTPS_PORT, + WebhookError, WebhookHandler, WebhookServer, + maintainer::{CustomResourceDefinitionMaintainer, CustomResourceDefinitionMaintainerOptions}, options::WebhookOptions, }; @@ -68,126 +57,98 @@ where // TODO: Add a builder, maybe with `bon`. #[derive(Debug)] -pub struct ConversionWebhookOptions { +pub struct ConversionWebhookOptions<'a> { /// The bind address to bind the HTTPS server to. pub socket_addr: SocketAddr, /// The namespace the operator/webhook is running in. - pub namespace: String, + pub namespace: &'a str, /// The name of the Kubernetes service which points to the operator/webhook. - pub service_name: String, - - /// If the CRDs should be maintained automatically. Use the (negated) value from - /// `stackable_operator::cli::ProductOperatorRun::disable_crd_maintenance` - /// for this. - // # Because of https://github.com/rust-lang/cargo/issues/3475 we can not use a real link here - pub maintain_crds: bool, - - /// The field manager used to apply Kubernetes objects, typically the operator name, e.g. - /// `airflow-operator`. - pub field_manager: String, + pub service_name: &'a str, } /// A ready-to-use CRD conversion webhook server. /// /// See [`ConversionWebhookServer::new()`] for usage examples. -pub struct ConversionWebhookServer { - crds: Vec, - options: ConversionWebhookOptions, - router: Router, - client: Client, -} +pub struct ConversionWebhookServer(WebhookServer); impl ConversionWebhookServer { - /// Creates a new conversion webhook server, which expects POST requests being made to the - /// `/convert/{crd name}` endpoint. + /// The default socket address the conversion webhook server binds to, see + /// [`WebhookServer::DEFAULT_SOCKET_ADDRESS`]. + pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = WebhookServer::DEFAULT_SOCKET_ADDRESS; + + /// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being + /// made to the `/convert/{CRD_NAME}` endpoint. /// - /// You need to provide a few things for every CRD passed in via the `crds_and_handlers` argument: + /// The TLS certificate is automatically generated and rotated. /// - /// 1. The CRD - /// 2. A conversion function to convert between CRD versions. Typically you would use the - /// the auto-generated `try_convert` function on CRD spec definition structs for this. - /// 3. A [`kube::Client`] used to create/update the CRDs. + /// ## Parameters /// - /// The [`ConversionWebhookServer`] takes care of reconciling the CRDs into the Kubernetes - /// cluster and takes care of adding itself as conversion webhook. This includes TLS - /// certificates and CA bundles. + /// This function expects the following parameters: /// - /// # Example + /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`] + /// to a handler function. In most cases, the generated `CustomResource::try_merge` function + /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview` + /// signature. + /// - `options`: Provides [`ConversionWebhookOptions`] to customize various parts of the + /// webhook server, eg. the socket address used to listen on. /// - /// ```no_run - /// use clap::Parser; - /// use stackable_webhook::{ - /// servers::{ConversionWebhookServer, ConversionWebhookOptions}, - /// constants::CONVERSION_WEBHOOK_HTTPS_PORT, - /// WebhookOptions - /// }; - /// use stackable_operator::{ - /// kube::Client, - /// crd::s3::{S3Connection, S3ConnectionVersion}, - /// cli::{RunArguments, MaintenanceOptions}, - /// }; + /// ## Return Values + /// + /// This function returns a [`Result`] which contains a 2-tuple (pair) of values for the [`Ok`] + /// variant: /// - /// # async fn test() { - /// // Things that should already be in you operator: - /// const OPERATOR_NAME: &str = "product-operator"; - /// let client = Client::try_default().await.expect("failed to create Kubernetes client"); - /// let RunArguments { - /// operator_environment, - /// maintenance: MaintenanceOptions { - /// disable_crd_maintenance, - /// .. - /// }, - /// .. - /// } = RunArguments::parse(); + /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See + /// [`ConversionWebhookServer::run`] for more details. + /// - The [`mpsc::Receiver`] which will be used to send out messages containing the newly + /// generated TLS certificate. This channel is used by the CRD maintainer to trigger a + /// reconcile of the CRDs it maintains. /// - /// let crds_and_handlers = [ + /// ## Example + /// + /// ```no_run + /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider}; + /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions}; + /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion}; + /// + /// # #[tokio::main] + /// # async fn main() { + /// # CryptoProvider::install_default(default_provider()).unwrap(); + /// let crds_and_handlers = vec![ /// ( /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) - /// .expect("failed to merge S3Connection CRD"), - /// S3Connection::try_convert as fn(_) -> _, - /// ), + /// .expect("the S3Connection CRD must be merged"), + /// S3Connection::try_convert, + /// ) /// ]; /// /// let options = ConversionWebhookOptions { - /// socket_addr: format!("0.0.0.0:{CONVERSION_WEBHOOK_HTTPS_PORT}") - /// .parse() - /// .expect("static address is always valid"), - /// namespace: operator_environment.operator_namespace, - /// service_name: operator_environment.operator_service_name, - /// maintain_crds: !disable_crd_maintenance, - /// field_manager: OPERATOR_NAME.to_owned(), + /// socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS, + /// namespace: "stackable-operators", + /// service_name: "product-operator", /// }; /// - /// // Construct the conversion webhook server - /// let conversion_webhook = ConversionWebhookServer::new( - /// crds_and_handlers, - /// options, - /// client, - /// ) - /// .await - /// .expect("failed to create ConversionWebhookServer"); + /// let (conversion_webhook_server, _certificate_rx) = + /// ConversionWebhookServer::new(crds_and_handlers, options) + /// .await + /// .unwrap(); /// - /// conversion_webhook.run().await.expect("failed to run ConversionWebhookServer"); + /// conversion_webhook_server.run().await.unwrap(); /// # } /// ``` - #[instrument( - name = "create_conversion_webhook_server", - skip(crds_and_handlers, client) - )] + #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))] pub async fn new( crds_and_handlers: impl IntoIterator, - options: ConversionWebhookOptions, - client: Client, - ) -> Result + options: ConversionWebhookOptions<'_>, + ) -> Result<(Self, mpsc::Receiver), ConversionWebhookError> where H: WebhookHandler + Clone + Send + Sync + 'static, { tracing::debug!("create new conversion webhook server"); let mut router = Router::new(); - let mut crds = Vec::new(); + for (crd, handler) in crds_and_handlers { let crd_name = crd.name_any(); let handler_fn = |Json(review): Json| async { @@ -195,39 +156,21 @@ impl ConversionWebhookServer { Json(review) }; + // TODO (@Techassi): Make this part of the trait mentioned above let route = format!("/convert/{crd_name}"); router = router.route(&route, post(handler_fn)); - crds.push(crd); } - Ok(Self { - options, - router, - client, - crds, - }) - } - - pub async fn run(self) -> Result<(), ConversionWebhookError> { - tracing::info!("starting conversion webhook server"); - - let Self { - options, - router, - client, - crds, - } = self; - let ConversionWebhookOptions { socket_addr, namespace: operator_namespace, service_name: operator_service_name, - maintain_crds, - field_manager, } = &options; // This is how Kubernetes calls us, so it decides about the naming. // AFAIK we can not influence this, so this is the only SAN entry needed. + // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes) + // HTTP clients can use the FQDN of the service for testing or user use-cases. let subject_alterative_dns_name = format!("{operator_service_name}.{operator_namespace}.svc",); @@ -236,127 +179,71 @@ impl ConversionWebhookServer { socket_addr: *socket_addr, }; - let (server, mut cert_rx) = WebhookServer::new(router, webhook_options) + let (server, certificate_rx) = WebhookServer::new(router, webhook_options) .await .context(CreateWebhookServerSnafu)?; - // We block the ConversionWebhookServer creation until the certificates have been generated. - // This way we - // 1. Are able to apply the CRDs before we start the actual controllers relying on them - // 2. Avoid updating them shortly after as cert have been generated. Doing so would cause - // unnecessary "too old resource version" errors in the controllers as the CRD was updated. - let current_cert = cert_rx - .recv() - .await - .context(ReceiveCertificateFromChannelSnafu)?; + Ok((Self(server), certificate_rx)) + } - if *maintain_crds { - Self::reconcile_crds( - &client, - field_manager, - &crds, - operator_namespace, - operator_service_name, - current_cert, - ) - .await - .context(ReconcileCrdsSnafu)?; + /// Creates and returns a tuple consisting of a [`ConversionWebhookServer`], a [`CustomResourceDefinitionMaintainer`], + /// and a [`oneshot::Receiver`]. + /// + /// See the referenced items for more details on usage. + pub async fn with_maintainer<'a, H>( + // TODO (@Techassi): Use a trait type here which can be used to build all part of the + // conversion webhook server and a CRD maintainer. + crds_and_handlers: impl IntoIterator + Clone, + operator_name: &'a str, + operator_namespace: &'a str, + disable_maintainer: bool, + client: Client, + ) -> Result< + ( + Self, + CustomResourceDefinitionMaintainer<'a>, + oneshot::Receiver<()>, + ), + ConversionWebhookError, + > + where + H: WebhookHandler + Clone + Send + Sync + 'static, + { + let socket_addr = ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS; - try_join!( - Self::run_webhook_server(server), - Self::run_crd_reconciliation_loop( - cert_rx, - &client, - field_manager, - &crds, - operator_namespace, - operator_service_name, - ), - )?; - } else { - Self::run_webhook_server(server).await?; + // TODO (@Techassi): These should be moved into a builder + let webhook_options = ConversionWebhookOptions { + namespace: operator_namespace, + service_name: operator_name, + socket_addr, }; - Ok(()) - } + let (conversion_webhook_server, certificate_rx) = + Self::new(crds_and_handlers.clone(), webhook_options).await?; - async fn run_webhook_server(server: WebhookServer) -> Result<(), ConversionWebhookError> { - server.run().await.context(RunWebhookServerSnafu) - } + let definitions = crds_and_handlers.into_iter().map(|(crd, _)| crd); - async fn run_crd_reconciliation_loop( - mut cert_rx: mpsc::Receiver, - client: &Client, - field_manager: &str, - crds: &[CustomResourceDefinition], - operator_namespace: &str, - operator_service_name: &str, - ) -> Result<(), ConversionWebhookError> { - while let Some(current_cert) = cert_rx.recv().await { - Self::reconcile_crds( - client, - field_manager, - crds, - operator_namespace, - operator_service_name, - current_cert, - ) - .await - .context(ReconcileCrdsSnafu)?; - } - Ok(()) - } + // TODO (@Techassi): These should be moved into a builder + let maintainer_options = CustomResourceDefinitionMaintainerOptions { + webhook_https_port: socket_addr.port(), + disabled: disable_maintainer, + operator_namespace, + operator_name, + }; - #[instrument(skip_all)] - async fn reconcile_crds( - client: &Client, - field_manager: &str, - crds: &[CustomResourceDefinition], - operator_namespace: &str, - operator_service_name: &str, - current_cert: Certificate, - ) -> Result<(), ConversionWebhookError> { - tracing::info!( - crds = ?crds.iter().map(CustomResourceDefinition::name_any).collect::>(), - "Reconciling CRDs" + let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( + client, + certificate_rx, + definitions, + maintainer_options, ); - let ca_bundle = current_cert - .to_pem(LineEnding::LF) - .context(ConvertCaToPemSnafu)?; - let crd_api: Api = Api::all(client.clone()); - for mut crd in crds.iter().cloned() { - let crd_name = crd.name_any(); - - crd.spec.conversion = Some(CustomResourceConversion { - strategy: "Webhook".to_string(), - webhook: Some(WebhookConversion { - // conversionReviewVersions indicates what ConversionReview versions are understood/preferred by the webhook. - // The first version in the list understood by the API server is sent to the webhook. - // The webhook must respond with a ConversionReview object in the same version it received. - conversion_review_versions: vec!["v1".to_string()], - client_config: Some(WebhookClientConfig { - service: Some(ServiceReference { - name: operator_service_name.to_owned(), - namespace: operator_namespace.to_owned(), - path: Some(format!("/convert/{crd_name}")), - port: Some(CONVERSION_WEBHOOK_HTTPS_PORT.into()), - }), - ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), - url: None, - }), - }), - }); + Ok((conversion_webhook_server, maintainer, initial_reconcile_rx)) + } - let patch = Patch::Apply(&crd); - let patch_params = PatchParams::apply(field_manager); - crd_api - .patch(&crd_name, &patch_params, &patch) - .await - .with_context(|_| UpdateCrdSnafu { - crd_name: crd_name.to_string(), - })?; - } - Ok(()) + /// Runs the [`ConversionWebhookServer`] asynchronously. + pub async fn run(self) -> Result<(), ConversionWebhookError> { + tracing::info!("run conversion webhook server"); + self.0.run().await.context(RunWebhookServerSnafu) } } diff --git a/crates/stackable-webhook/src/tls/cert_resolver.rs b/crates/stackable-webhook/src/tls/cert_resolver.rs index 6320c0ce8..d355b956c 100644 --- a/crates/stackable-webhook/src/tls/cert_resolver.rs +++ b/crates/stackable-webhook/src/tls/cert_resolver.rs @@ -44,7 +44,7 @@ pub enum CertificateResolverError { TokioSpawnBlocking { source: tokio::task::JoinError }, #[snafu(display("no default rustls CryptoProvider installed"))] - NoDefaultCryptoProviderInstalled {}, + NoDefaultCryptoProviderInstalled, } /// This struct serves as [`ResolvesServerCert`] to always hand out the current certificate for TLS @@ -59,23 +59,25 @@ pub struct CertificateResolver { current_certified_key: ArcSwap, subject_alterative_dns_names: Arc>, - cert_tx: mpsc::Sender, + certificate_tx: mpsc::Sender, } impl CertificateResolver { pub async fn new( subject_alterative_dns_names: Vec, - cert_tx: mpsc::Sender, + certificate_tx: mpsc::Sender, ) -> Result { let subject_alterative_dns_names = Arc::new(subject_alterative_dns_names); - let certified_key = - Self::generate_new_certificate_inner(subject_alterative_dns_names.clone(), &cert_tx) - .await?; + let certified_key = Self::generate_new_certificate_inner( + subject_alterative_dns_names.clone(), + &certificate_tx, + ) + .await?; Ok(Self { subject_alterative_dns_names, current_certified_key: ArcSwap::new(certified_key), - cert_tx, + certificate_tx, }) } @@ -90,7 +92,8 @@ impl CertificateResolver { async fn generate_new_certificate(&self) -> Result> { let subject_alterative_dns_names = self.subject_alterative_dns_names.clone(); - Self::generate_new_certificate_inner(subject_alterative_dns_names, &self.cert_tx).await + Self::generate_new_certificate_inner(subject_alterative_dns_names, &self.certificate_tx) + .await } /// Creates a new certificate and returns the certified key. @@ -102,7 +105,7 @@ impl CertificateResolver { /// See [the relevant decision](https://github.com/stackabletech/decisions/issues/56) async fn generate_new_certificate_inner( subject_alterative_dns_names: Arc>, - cert_tx: &mpsc::Sender, + certificate_tx: &mpsc::Sender, ) -> Result> { // The certificate generations can take a while, so we use `spawn_blocking` let (cert, certified_key) = tokio::task::spawn_blocking(move || { @@ -141,7 +144,7 @@ impl CertificateResolver { .await .context(TokioSpawnBlockingSnafu)??; - cert_tx + certificate_tx .send(cert) .await .map_err(|_err| CertificateResolverError::SendCertificateToChannel)?; diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index a796e8b38..cfd7c2e86 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -11,7 +11,7 @@ use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::trace::{FutureExt, SpanKind}; use opentelemetry_semantic_conventions as semconv; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use stackable_shared::time::Duration; use tokio::{ net::{TcpListener, TcpStream}, @@ -21,7 +21,7 @@ use tokio_rustls::{ TlsAcceptor, rustls::{ ServerConfig, - crypto::ring::default_provider, + crypto::CryptoProvider, version::{TLS12, TLS13}, }, }; @@ -59,6 +59,9 @@ pub enum TlsServerError { #[snafu(display("failed to set safe TLS protocol versions"))] SetSafeTlsProtocolVersions { source: tokio_rustls::rustls::Error }, + + #[snafu(display("no default rustls CryptoProvider installed"))] + NoDefaultCryptoProviderInstalled, } /// A server which terminates TLS connections and allows clients to communicate @@ -85,20 +88,22 @@ impl TlsServer { router: Router, options: WebhookOptions, ) -> Result<(Self, mpsc::Receiver)> { - let (cert_tx, cert_rx) = mpsc::channel(1); + let (certificate_tx, certificate_rx) = mpsc::channel(1); let WebhookOptions { socket_addr, subject_alterative_dns_names, } = options; - let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, cert_tx) + let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, certificate_tx) .await .context(CreateCertificateResolverSnafu)?; let cert_resolver = Arc::new(cert_resolver); - let tls_provider = default_provider(); - let mut config = ServerConfig::builder_with_provider(tls_provider.into()) + let tls_provider = + CryptoProvider::get_default().context(NoDefaultCryptoProviderInstalledSnafu)?; + + let mut config = ServerConfig::builder_with_provider(tls_provider.clone()) .with_protocol_versions(&[&TLS12, &TLS13]) .context(SetSafeTlsProtocolVersionsSnafu)? .with_no_client_auth() @@ -112,7 +117,7 @@ impl TlsServer { router, }; - Ok((tls_server, cert_rx)) + Ok((tls_server, certificate_rx)) } /// Runs the TLS server by listening for incoming TCP connections on the