Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ All notable changes to this project will be documented in this file.

### Added

- Add `CustomResourceDefinitionMaintainer` which applies and patches CRDs triggered by TLS
certificate rotations of the `ConversionWebhookServer`. It additionally provides a `oneshot`
channel which can be used to trigger creation/patching of any default custom resources deployed by
the operator ([#1099]).
- Add a `Client::create_if_missing` associated function to create a resource if it doesn't
exist ([#1099]).
- Add end-of-support checker ([#1096]).
- The EoS checker can be constructed using `EndOfSupportChecker::new()`.
- Add new `MaintenanceOptions` and `EndOfSupportOptions` structs.
Expand All @@ -24,6 +30,7 @@ All notable changes to this project will be documented in this file.

[#1096]: https://github.com/stackabletech/operator-rs/pull/1096
[#1098]: https://github.com/stackabletech/operator-rs/pull/1098
[#1099]: https://github.com/stackabletech/operator-rs/pull/1099

## [0.98.0] - 2025-09-22

Expand Down
5 changes: 3 additions & 2 deletions crates/stackable-operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ full = ["certs", "telemetry", "versioned", "time", "webhook", "clap"]
default = ["telemetry", "versioned", "clap"]

clap = []
certs = ["dep:stackable-certs"]
certs = ["dep:stackable-certs", "dep:x509-cert"]
telemetry = ["dep:stackable-telemetry"]
time = ["stackable-shared/time"]
versioned = ["dep:stackable-versioned"]
webhook = ["dep:stackable-webhook"]
webhook = ["dep:stackable-webhook", "dep:x509-cert"]

[dependencies]
stackable-certs = { path = "../stackable-certs", optional = true }
Expand Down Expand Up @@ -53,6 +53,7 @@ tracing.workspace = true
tracing-appender.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
x509-cert = { workspace = true, optional = true }

[dev-dependencies]
rstest.workspace = true
Expand Down
19 changes: 19 additions & 0 deletions crates/stackable-operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@ impl Client {
})
}

/// Optionally creates a resource if it does not exist yet.
///
/// The name used for lookup is extracted from the resource via [`ResourceExt::name_any()`].
/// This function either returns the existing resource or the newly created one.
pub async fn create_if_missing<T>(&self, resource: &T) -> Result<T>
where
T: Clone + Debug + DeserializeOwned + Resource + Serialize + GetApi,
<T as Resource>::DynamicType: Default,
{
if let Some(r) = self
.get_opt(&resource.name_any(), resource.get_namespace())
.await?
{
return Ok(r);
}

self.create(resource).await
}

/// Patches a resource using the `MERGE` patch strategy described
/// in [JSON Merge Patch](https://tools.ietf.org/html/rfc7386)
/// This will fail for objects that do not exist yet.
Expand Down
213 changes: 213 additions & 0 deletions crates/stackable-operator/src/crd/maintainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
use k8s_openapi::{
ByteString,
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
WebhookConversion,
},
};
use kube::{
Api, Client, ResourceExt,
api::{Patch, PatchParams},
};
use snafu::{ResultExt, Snafu};
use tokio::sync::{mpsc, oneshot};
use x509_cert::{
Certificate,
der::{EncodePem, pem::LineEnding},
};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("failed to encode CA certificate as PEM format"))]
EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error },

#[snafu(display("failed to send initial CRD reconcile heartbeat"))]
SendInitialReconcileHeartbeat,

#[snafu(display("failed to patch CRD {crd_name:?}"))]
PatchCrd {
source: kube::Error,
crd_name: String,
},
}

/// Maintains various custom resource definitions.
///
/// When running this, the following operations are done:
///
/// - Apply the CRDs when starting up
/// - Reconcile the CRDs when the conversion webhook certificate is rotated
pub struct CustomResourceDefinitionMaintainer {
client: Client,
certificate_rx: mpsc::Receiver<Certificate>,

definitions: Vec<CustomResourceDefinition>,
options: CustomResourceDefinitionMaintainerOptions,

initial_reconcile_tx: Option<oneshot::Sender<()>>,
}

impl CustomResourceDefinitionMaintainer {
/// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more
/// custom resource definitions.
///
/// ## Parameters
///
/// This function expects four parameters:
///
/// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches
/// the CRDs when the TLS certificate is rotated.
/// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The
/// certificate data sent through the channel is used to set the caBundle in the conversion
/// section of the CRD.
/// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained
/// by this maintainer. If the iterator is empty, the maintainer returns early without doing
/// any work. As such, a polling mechanism which waits for all futures should be used to
/// prevent premature termination of the operator.
/// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various
/// parts of the maintainer. In the future, this will be converted to a builder, to enable a
/// cleaner API interface.
///
/// ## Return Values
///
/// This function returns a 2-tuple (pair) of values:
///
/// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer.
/// See [`CustomResourceDefinitionMaintainer::run`] for more details.
/// - The [`oneshot::Receiver`] which will be used to send out a message once the initial
/// CRD reconciliation ran. This signal can be used to trigger the deployment of custom
/// resources defined by the maintained CRDs.
pub fn new(
client: Client,
certificate_rx: mpsc::Receiver<Certificate>,
definitions: impl IntoIterator<Item = CustomResourceDefinition>,
options: CustomResourceDefinitionMaintainerOptions,
) -> (Self, oneshot::Receiver<()>) {
let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel();
let initial_reconcile_tx = Some(initial_reconcile_tx);

let maintainer = Self {
definitions: definitions.into_iter().collect(),
initial_reconcile_tx,
certificate_rx,
options,
client,
};

(maintainer, initial_reconcile_rx)
}

/// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously.
///
/// This needs to be polled in parallel with other parts of an operator, like controllers or
/// webhook servers. If it is disabled, the returned future immediately resolves to
/// [`std::task::Poll::Ready`] and thus doesn't consume any resources.
pub async fn run(mut self) -> Result<(), Error> {
let CustomResourceDefinitionMaintainerOptions {
operator_service_name,
operator_namespace,
field_manager,
webhook_https_port: https_port,
disabled,
} = self.options;

// If the maintainer is disabled or there are no custom resource definitions, immediately
// return without doing any work.
if disabled || self.definitions.is_empty() {
return Ok(());
}

// This get's polled by the async runtime on a regular basis (or when woken up). Once we
// receive a message containing the newly generated TLS certificate for the conversion
// webhook, we need to update the caBundle in the CRD.
while let Some(certificate) = self.certificate_rx.recv().await {
tracing::info!(
k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(),
"reconciling custom resource definitions"
);

// The caBundle needs to be provided as a base64-encoded PEM envelope.
let ca_bundle = certificate
.to_pem(LineEnding::LF)
.context(EncodeCertificateAuthorityAsPemSnafu)?;

let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone());

for mut crd in self.definitions.iter().cloned() {
let crd_kind = &crd.spec.names.kind;
let crd_name = crd.name_any();

tracing::debug!(
k8s.crd.kind = crd_kind,
k8s.crd.name = crd_name,
"reconciling custom resource definition"
);

crd.spec.conversion = Some(CustomResourceConversion {
strategy: "Webhook".to_owned(),
webhook: Some(WebhookConversion {
// conversionReviewVersions indicates what ConversionReview versions are
// supported by the webhook. The first version in the list understood by the
// API server is sent to the webhook. The webhook must respond with a
// ConversionReview object in the same version it received. We only support
// the stable v1 ConversionReview to keep the implementation as simple as
// possible.
conversion_review_versions: vec!["v1".to_owned()],
client_config: Some(WebhookClientConfig {
service: Some(ServiceReference {
name: operator_service_name.clone(),
namespace: operator_namespace.clone(),
path: Some(format!("/convert/{crd_name}")),
port: Some(https_port.into()),
}),
// Here, ByteString takes care of encoding the provided content as
// base64.
ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())),
url: None,
}),
}),
});

// Deploy the updated CRDs using a server-side apply.
let patch = Patch::Apply(&crd);
let patch_params = PatchParams::apply(&field_manager);
crd_api
.patch(&crd_name, &patch_params, &patch)
.await
.with_context(|_| PatchCrdSnafu { crd_name })?;
}

// After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out
// via the oneshot channel. This channel can only be used exactly once. The sender's
// send method consumes self, and as such, the sender is wrapped in an Option to be
// able to call take to consume the inner value.
if let Some(initial_reconcile_tx) = self.initial_reconcile_tx.take() {
match initial_reconcile_tx.send(()) {
Ok(_) => {}
Err(_) => return SendInitialReconcileHeartbeatSnafu.fail(),
}
}
}

Ok(())
}
}

// TODO (@Techassi): Make this a builder instead
/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`].
pub struct CustomResourceDefinitionMaintainerOptions {
/// The service name used by the operator/conversion webhook.
pub operator_service_name: String,

/// The namespace the operator/conversion webhook runs in.
pub operator_namespace: String,

/// The name of the field manager used for the server-side apply.
pub field_manager: String,

/// The HTTPS port the conversion webhook listens on.
pub webhook_https_port: u16,

/// Indicates if the maintainer should be disabled.
pub disabled: bool,
}
2 changes: 2 additions & 0 deletions crates/stackable-operator/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize};
pub mod authentication;
pub mod git_sync;
pub mod listener;
#[cfg(feature = "webhook")]
pub mod maintainer;
pub mod s3;

/// A reference to a product cluster (for example, a `ZookeeperCluster`)
Expand Down
15 changes: 15 additions & 0 deletions crates/stackable-webhook/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Changed

- BREAKING: The `ConversionWebhookServer` now returns a pair of values ([#1099]):
- The conversion webhook server itself
- A `mpsc::Receiver<Certificate>` to provide consumers the newly generated TLS certificate
- BREAKING: Constants for ports, IP addresses and socket addresses are now associated constants on
`(Conversion)WebhookServer` instead of free-standing ones ([#1099]).

### Removed

- BREAKING: The `maintain_crds` and `field_manager` fields in `ConversionWebhookOptions`
are removed ([#1099]).

[#1099]: https://github.com/stackabletech/operator-rs/pull/1099

## [0.6.0] - 2025-09-09

### Added
Expand Down
21 changes: 0 additions & 21 deletions crates/stackable-webhook/src/constants.rs

This file was deleted.

14 changes: 12 additions & 2 deletions crates/stackable-webhook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
//! enable complete control over these details if needed.
//!
//! [1]: crate::servers::ConversionWebhookServer
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use axum::{Router, routing::get};
use futures_util::{FutureExt as _, pin_mut, select};
use snafu::{ResultExt, Snafu};
Expand All @@ -35,12 +37,11 @@ use tokio::{
sync::mpsc,
};
use tower::ServiceBuilder;
use x509_cert::Certificate;
pub use x509_cert::Certificate;

// use tower_http::trace::TraceLayer;
use crate::tls::TlsServer;

pub mod constants;
pub mod options;
pub mod servers;
pub mod tls;
Expand Down Expand Up @@ -86,6 +87,15 @@ pub struct WebhookServer {
}

impl WebhookServer {
/// The default HTTPS port `8443`
pub const DEFAULT_HTTPS_PORT: u16 = 8443;
/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to,
/// which represents binding on all network addresses.
pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
/// The default socket address `0.0.0.0:8443` the webhook server binds to.
pub const DEFAULT_SOCKET_ADDRESS: SocketAddr =
SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT);

/// Creates a new ready-to-use webhook server.
///
/// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles
Expand Down
Loading