Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ repository = "https://github.com/stackabletech/operator-rs"
[workspace.dependencies]
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.8.0" }

arc-swap = "1.7"
arc-swap = "1.7.0"
async-trait = "0.1.89"
axum = { version = "0.8.1", features = ["http2"] }
chrono = { version = "0.4.38", default-features = false }
clap = { version = "4.5.17", features = ["derive", "cargo", "env"] }
Expand All @@ -38,7 +39,7 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = ["schem
# We use rustls instead of openssl for easier portability, e.g. so that we can build stackablectl without the need to vendor (build from source) openssl
# We use ring instead of aws-lc-rs, as this currently fails to build in "make run-dev"
# We pin the kube version, as we use a patch for 2.0.1 below
kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "rustls-tls", "ring"] }
kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] }
opentelemetry = "0.31.0"
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] }
opentelemetry-appender-tracing = "0.31.0"
Expand Down
11 changes: 11 additions & 0 deletions crates/stackable-webhook/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Add support for mutating webhooks ([#1119]).

### Changed

- BREAKING: Refactor the entire `WebhookServer` mechanism, so multiple webhooks can run in parallel.
Put individual webhooks (currently `ConversionWebhook` and `MutatingWebhook`) behind the `Webhook` trait ([#1119]).

[#1119]: https://github.com/stackabletech/operator-rs/pull/1119

## [0.7.1] - 2025-10-31

### Fixed
Expand Down
2 changes: 2 additions & 0 deletions crates/stackable-webhook/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ stackable-shared = { path = "../stackable-shared" }
stackable-telemetry = { path = "../stackable-telemetry" }

arc-swap.workspace = true
async-trait.workspace = true
axum.workspace = true
futures-util.workspace = true
hyper-util.workspace = true
Expand All @@ -21,6 +22,7 @@ kube.workspace = true
opentelemetry.workspace = true
opentelemetry-semantic-conventions.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tokio-rustls.workspace = true
Expand Down
232 changes: 125 additions & 107 deletions crates/stackable-webhook/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,92 +1,99 @@
//! Utility types and functions to easily create ready-to-use webhook servers
//! which can handle different tasks, for example CRD conversions. All webhook
//! servers use HTTPS by default. This library is fully compatible with the
//! [`tracing`] crate and emits debug level tracing data.
//! Utility types and functions to easily create ready-to-use webhook servers which can handle
//! different tasks. All webhook servers use HTTPS by default.
//!
//! Most users will only use the top-level exported generic [`WebhookServer`]
//! which enables complete control over the [Router] which handles registering
//! routes and their handler functions.
//! Currently the following webhooks are supported:
//!
//! ```
//! use stackable_webhook::{WebhookServer, WebhookOptions};
//! use axum::Router;
//! * [webhooks::ConversionWebhook]
//! * [webhooks::MutatingWebhook]
//! * In the future validating webhooks wil be added
//!
//! # async fn test() {
//! let router = Router::new();
//! let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default())
//! .await
//! .expect("failed to create WebhookServer");
//! # }
//! ```
//! This library is fully compatible with the [`tracing`] crate and emits debug level tracing data.
//!
//! For some usages, complete end-to-end [`WebhookServer`] implementations
//! exist. One such implementation is the [`ConversionWebhookServer`][1].
//!
//! This library additionally also exposes lower-level structs and functions to
//! enable complete control over these details if needed.
//!
//! [1]: crate::servers::ConversionWebhookServer
//! For usage please look at the [`WebhookServer`] docs as well as the specific [`Webhook`] you are
//! using.
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use ::x509_cert::Certificate;
use axum::{Router, routing::get};
use futures_util::{FutureExt as _, pin_mut, select};
use futures_util::{FutureExt as _, TryFutureExt, select};
use k8s_openapi::ByteString;
use snafu::{ResultExt, Snafu};
use stackable_telemetry::AxumTraceLayer;
use tokio::{
signal::unix::{SignalKind, signal},
sync::mpsc,
try_join,
};
use tower::ServiceBuilder;
use webhooks::{Webhook, WebhookError};
use x509_cert::der::{EncodePem, pem::LineEnding};

// Selected re-exports
pub use crate::options::WebhookOptions;
use crate::tls::TlsServer;

pub mod maintainer;
pub mod options;
pub mod servers;
pub mod tls;

/// A generic webhook handler receiving a request and sending back a response.
///
/// This trait is not intended to be implemented by external crates and this
/// library provides various ready-to-use implementations for it. One such an
/// implementation is part of the [`ConversionWebhookServer`][1].
///
/// [1]: crate::servers::ConversionWebhookServer
pub trait WebhookHandler<Req, Res> {
fn call(self, req: Req) -> Res;
}
pub mod webhooks;

/// A result type alias with the [`WebhookError`] type as the default error type.
pub type Result<T, E = WebhookError> = std::result::Result<T, E>;
pub type Result<T, E = WebhookServerError> = std::result::Result<T, E>;

#[derive(Debug, Snafu)]
pub enum WebhookError {
pub enum WebhookServerError {
#[snafu(display("failed to create TLS server"))]
CreateTlsServer { source: tls::TlsServerError },

#[snafu(display("failed to run TLS server"))]
RunTlsServer { source: tls::TlsServerError },

#[snafu(display("failed to update certificate"))]
UpdateCertificate { source: WebhookError },

#[snafu(display("failed to encode CA certificate as PEM format"))]
EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error },
}

/// A ready-to-use webhook server.
/// An HTTPS server that serves one or more webhooks.
///
/// It also handles TLS certificate rotation.
///
/// This server abstracts away lower-level details like TLS termination
/// and other various configurations, validations or middlewares. The routes
/// and their handlers are completely customizable by bringing your own
/// Axum [`Router`].
/// ### Example usage
///
/// For complete end-to-end implementations, see [`ConversionWebhookServer`][1].
/// ```
/// use stackable_webhook::WebhookServer;
/// use stackable_webhook::WebhookServerOptions;
/// use stackable_webhook::webhooks::Webhook;
///
/// [1]: crate::servers::ConversionWebhookServer
/// # async fn docs() {
/// let mut webhooks: Vec<Box<dyn Webhook>> = vec![];
///
/// let webhook_options = WebhookServerOptions {
/// socket_addr: WebhookServer::DEFAULT_SOCKET_ADDRESS,
/// webhook_namespace: "my-namespace".to_owned(),
/// webhook_service_name: "my-operator".to_owned(),
/// };
/// let webhook_server = WebhookServer::new(webhook_options, webhooks).await.unwrap();
/// # }
/// ```
pub struct WebhookServer {
options: WebhookServerOptions,
webhooks: Vec<Box<dyn Webhook>>,
tls_server: TlsServer,
cert_rx: mpsc::Receiver<Certificate>,
}

#[derive(Clone, Debug)]
pub struct WebhookServerOptions {
/// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] binds to.
pub socket_addr: SocketAddr,

/// The namespace the webhook is running in.
pub webhook_namespace: String,

/// The name of the Kubernetes service which points to the webhook.
pub webhook_service_name: String,
}

impl WebhookServer {
/// The default HTTPS port `8443`
/// The default HTTPS port
pub const DEFAULT_HTTPS_PORT: u16 = 8443;
/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to,
/// which represents binding on all network addresses.
Expand All @@ -99,52 +106,13 @@ impl WebhookServer {
pub const DEFAULT_SOCKET_ADDRESS: SocketAddr =
SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT);

/// Creates a new ready-to-use webhook server.
/// Creates a new webhook server with the given config and list of [`Webhook`]s.
///
/// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles
/// routing based on the provided Axum `router`. Most of the time it is sufficient to use
/// [`WebhookOptions::default()`]. See the documentation for [`WebhookOptions`] for more details
/// on the default values.
///
/// To start the server, use the [`WebhookServer::run()`] function. This will
/// run the server using the Tokio runtime until it is terminated.
///
/// ### Basic Example
///
/// ```
/// use stackable_webhook::{WebhookServer, WebhookOptions};
/// use axum::Router;
///
/// # async fn test() {
/// let router = Router::new();
/// let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default())
/// .await
/// .expect("failed to create WebhookServer");
/// # }
/// ```
///
/// ### Example with Custom Options
///
/// ```
/// use stackable_webhook::{WebhookServer, WebhookOptions};
/// use axum::Router;
///
/// # async fn test() {
/// let options = WebhookOptions::builder()
/// .bind_address([127, 0, 0, 1], 8080)
/// .add_subject_alterative_dns_name("my-san-entry")
/// .build();
///
/// let router = Router::new();
/// let (server, cert_rx) = WebhookServer::new(router, options)
/// .await
/// .expect("failed to create WebhookServer");
/// # }
/// ```
/// Please read their documentation for details.
pub async fn new(
router: Router,
options: WebhookOptions,
) -> Result<(Self, mpsc::Receiver<Certificate>)> {
options: WebhookServerOptions,
webhooks: Vec<Box<dyn Webhook>>,
) -> Result<Self> {
tracing::trace!("create new webhook server");

// TODO (@Techassi): Make opt-in configurable from the outside
Expand All @@ -156,22 +124,33 @@ impl WebhookServer {
// by the Axum project.
//
// See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware
// TODO (@NickLarsenNZ): rename this server_builder and keep it specific to tracing, since it's placement in the chain is important
let service_builder = ServiceBuilder::new().layer(trace_layer);
let trace_service_builder = ServiceBuilder::new().layer(trace_layer);

// Create the root router and merge the provided router into it.
tracing::debug!("create core router and merge provided router");
let mut router = Router::new();
for webhook in &webhooks {
router = webhook.register_routes(router);
}

let router = router
.layer(service_builder)
// Enrich spans for routes added above.
// Routes defined below it will not be instrumented to reduce noise.
.layer(trace_service_builder)
// The health route is below the AxumTraceLayer so as not to be instrumented
.route("/health", get(|| async { "ok" }));

tracing::debug!("create TLS server");
let (tls_server, cert_rx) = TlsServer::new(router, options)
let (tls_server, cert_rx) = TlsServer::new(router, &options)
.await
.context(CreateTlsServerSnafu)?;

Ok((Self { tls_server }, cert_rx))
Ok(Self {
options,
webhooks,
tls_server,
cert_rx,
})
}

/// Runs the Webhook server and sets up signal handlers for shutting down.
Expand Down Expand Up @@ -200,19 +179,58 @@ impl WebhookServer {
};

// select requires Future + Unpin
pin_mut!(future_server);
pin_mut!(future_signal);

futures_util::future::select(future_server, future_signal).await;
tokio::pin!(future_server);
tokio::pin!(future_signal);

tokio::select! {
res = &mut future_server => {
// If the server future errors, propagate the error
res?;
}
_ = &mut future_signal => {
tracing::info!("shutdown signal received, stopping webhook server");
}
}

Ok(())
}

/// Runs the webhook server by creating a TCP listener and binding it to
/// the specified socket address.
async fn run_server(self) -> Result<()> {
tracing::debug!("run webhook server");

self.tls_server.run().await.context(RunTlsServerSnafu)
let Self {
options,
mut webhooks,
tls_server,
mut cert_rx,
} = self;
let tls_server = tls_server
.run()
.map_err(|err| WebhookServerError::RunTlsServer { source: err });

let cert_update_loop = async {
loop {
while let Some(cert) = cert_rx.recv().await {
// The caBundle needs to be provided as a base64-encoded PEM envelope.
let ca_bundle = cert
.to_pem(LineEnding::LF)
.context(EncodeCertificateAuthorityAsPemSnafu)?;
let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec());

for webhook in webhooks.iter_mut() {
webhook
.handle_certificate_rotation(&cert, &ca_bundle, &options)
.await
.context(UpdateCertificateSnafu)?;
}
}
}

// We need to hint the return type to the compiler
#[allow(unreachable_code)]
Ok(())
};

try_join!(cert_update_loop, tls_server).map(|_| ())
}
}
Loading