Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ All notable changes to this project will be documented in this file.

### Added

- Add `CustomResourceDefinitionMaintainer` which applies and patches CRDs triggered by TLS
certificate rotations of the `ConversionWebhookServer`. It additionally provides a `oneshot`
channel which can for example be used to trigger creation/patching of any custom resources deployed by
the operator ([#1099]).
- Add a `Client::create_if_missing` associated function to create a resource if it doesn't
exist ([#1099]).
- Add end-of-support checker ([#1096]).
- The EoS checker can be constructed using `EndOfSupportChecker::new()`.
- Add new `MaintenanceOptions` and `EndOfSupportOptions` structs.
Expand All @@ -24,6 +30,7 @@ All notable changes to this project will be documented in this file.

[#1096]: https://github.com/stackabletech/operator-rs/pull/1096
[#1098]: https://github.com/stackabletech/operator-rs/pull/1098
[#1099]: https://github.com/stackabletech/operator-rs/pull/1099

## [0.98.0] - 2025-09-22

Expand Down
2 changes: 1 addition & 1 deletion crates/stackable-operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ stackable-operator-derive = { path = "../stackable-operator-derive" }
stackable-shared = { path = "../stackable-shared" }
stackable-telemetry = { path = "../stackable-telemetry", optional = true, features = ["clap"] }
stackable-versioned = { path = "../stackable-versioned", optional = true }
stackable-webhook = { path = "../stackable-webhook", optional = true }
stackable-webhook = { path = "../stackable-webhook", optional = true, features = ["maintainer"]}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
stackable-webhook = { path = "../stackable-webhook", optional = true, features = ["maintainer"]}
stackable-webhook = { path = "../stackable-webhook", optional = true, features = ["maintainer"] }

Copy link
Member Author

@Techassi Techassi Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got removed in 99eae00.


chrono.workspace = true
clap.workspace = true
Expand Down
19 changes: 19 additions & 0 deletions crates/stackable-operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@ impl Client {
})
}

/// Optionally creates a resource if it does not exist yet.
///
/// The name used for lookup is extracted from the resource via [`ResourceExt::name_any()`].
/// This function either returns the existing resource or the newly created one.
pub async fn create_if_missing<T>(&self, resource: &T) -> Result<T>
where
T: Clone + Debug + DeserializeOwned + Resource + Serialize + GetApi,
<T as Resource>::DynamicType: Default,
{
if let Some(r) = self
.get_opt(&resource.name_any(), resource.get_namespace())
.await?
{
return Ok(r);
}

self.create(resource).await
}

/// Patches a resource using the `MERGE` patch strategy described
/// in [JSON Merge Patch](https://tools.ietf.org/html/rfc7386)
/// This will fail for objects that do not exist yet.
Expand Down
251 changes: 251 additions & 0 deletions crates/stackable-operator/src/crd/maintainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
use k8s_openapi::{
ByteString,
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
WebhookConversion,
},
};
use kube::{
Api, Client, ResourceExt,
api::{Patch, PatchParams},
};
use snafu::{ResultExt, Snafu};
use stackable_webhook::x509_cert::{self, Certificate, EncodePem, LineEnding};
use tokio::sync::{mpsc, oneshot};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("failed to encode CA certificate as PEM format"))]
EncodeCertificateAuthorityAsPem { source: x509_cert::Error },

#[snafu(display("failed to send initial CRD reconcile heartbeat"))]
SendInitialReconcileHeartbeat,

#[snafu(display("failed to patch CRD {crd_name:?}"))]
PatchCrd {
source: kube::Error,
crd_name: String,
},
}

/// Maintains various custom resource definitions.
///
/// When running this, the following operations are done:
///
/// - Apply the CRDs when starting up
/// - Reconcile the CRDs when the conversion webhook certificate is rotated
pub struct CustomResourceDefinitionMaintainer {
client: Client,
certificate_rx: mpsc::Receiver<Certificate>,

definitions: Vec<CustomResourceDefinition>,
options: CustomResourceDefinitionMaintainerOptions,

initial_reconcile_tx: oneshot::Sender<()>,
}

impl CustomResourceDefinitionMaintainer {
/// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more
/// custom resource definitions.
///
/// ## Parameters
///
/// This function expects four parameters:
///
/// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches
/// the CRDs when the TLS certificate is rotated.
/// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The
/// certificate data sent through the channel is used to set the caBundle in the conversion
/// section of the CRD.
/// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained
/// by this maintainer. If the iterator is empty, the maintainer returns early without doing
/// any work. As such, a polling mechanism which waits for all futures should be used to
/// prevent premature termination of the operator.
/// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various
/// parts of the maintainer. In the future, this will be converted to a builder, to enable a
/// cleaner API interface.
///
/// ## Return Values
///
/// This function returns a 2-tuple (pair) of values:
///
/// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer.
/// See [`CustomResourceDefinitionMaintainer::run`] for more details.
/// - The [`oneshot::Receiver`] which will be used to send out a message once the initial
/// CRD reconciliation ran. This signal can be used to trigger the deployment of custom
/// resources defined by the maintained CRDs.
///
/// ## Example
///
/// ```no_run
/// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion};
/// # use stackable_webhook::x509_cert::Certificate;
/// # use tokio::sync::mpsc::channel;
/// # use kube::Client;
/// use stackable_operator::crd::maintainer::{
/// CustomResourceDefinitionMaintainerOptions,
/// CustomResourceDefinitionMaintainer,
/// };
///
/// # #[tokio::main]
/// # async fn main() {
/// # let (certificate_tx, certificate_rx) = channel(1);
/// let options = CustomResourceDefinitionMaintainerOptions {
/// operator_service_name: "my-service-name".to_owned(),
/// operator_namespace: "my-namespace".to_owned(),
/// field_manager: "my-operator".to_owned(),
/// webhook_https_port: 8443,
/// disabled: true,
/// };
///
/// let client = Client::try_default().await.unwrap();
///
/// let definitions = vec![
/// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(),
/// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(),
/// ];
///
/// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new(
/// client,
/// certificate_rx,
/// definitions,
/// options,
/// );
/// # }
/// ```
pub fn new(
client: Client,
certificate_rx: mpsc::Receiver<Certificate>,
definitions: impl IntoIterator<Item = CustomResourceDefinition>,
options: CustomResourceDefinitionMaintainerOptions,
) -> (Self, oneshot::Receiver<()>) {
let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel();

let maintainer = Self {
definitions: definitions.into_iter().collect(),
initial_reconcile_tx,
certificate_rx,
options,
client,
};

(maintainer, initial_reconcile_rx)
}

/// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously.
///
/// This needs to be polled in parallel with other parts of an operator, like controllers or
/// webhook servers. If it is disabled, the returned future immediately resolves to
/// [`std::task::Poll::Ready`] and thus doesn't consume any resources.
pub async fn run(mut self) -> Result<(), Error> {
let CustomResourceDefinitionMaintainerOptions {
operator_service_name,
operator_namespace,
field_manager,
webhook_https_port: https_port,
disabled,
} = self.options;

// If the maintainer is disabled or there are no custom resource definitions, immediately
// return without doing any work.
if disabled || self.definitions.is_empty() {
return Ok(());
}

// This channel can only be used exactly once. The sender's send method consumes self, and
// as such, the sender is wrapped in an Option to be able to call take to consume the inner
// value.
let mut initial_reconcile_tx = Some(self.initial_reconcile_tx);

// This get's polled by the async runtime on a regular basis (or when woken up). Once we
// receive a message containing the newly generated TLS certificate for the conversion
// webhook, we need to update the caBundle in the CRD.
while let Some(certificate) = self.certificate_rx.recv().await {
tracing::info!(
k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(),
"reconciling custom resource definitions"
);

// The caBundle needs to be provided as a base64-encoded PEM envelope.
let ca_bundle = certificate
.to_pem(LineEnding::LF)
.context(EncodeCertificateAuthorityAsPemSnafu)?;

let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone());

for mut crd in self.definitions.iter().cloned() {
let crd_kind = &crd.spec.names.kind;
let crd_name = crd.name_any();

tracing::debug!(
k8s.crd.kind = crd_kind,
k8s.crd.name = crd_name,
"reconciling custom resource definition"
);

crd.spec.conversion = Some(CustomResourceConversion {
strategy: "Webhook".to_owned(),
webhook: Some(WebhookConversion {
// conversionReviewVersions indicates what ConversionReview versions are
// supported by the webhook. The first version in the list understood by the
// API server is sent to the webhook. The webhook must respond with a
// ConversionReview object in the same version it received. We only support
// the stable v1 ConversionReview to keep the implementation as simple as
// possible.
conversion_review_versions: vec!["v1".to_owned()],
client_config: Some(WebhookClientConfig {
service: Some(ServiceReference {
name: operator_service_name.clone(),
namespace: operator_namespace.clone(),
path: Some(format!("/convert/{crd_name}")),
port: Some(https_port.into()),
}),
// Here, ByteString takes care of encoding the provided content as
// base64.
ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())),
url: None,
}),
}),
});

// Deploy the updated CRDs using a server-side apply.
let patch = Patch::Apply(&crd);
let patch_params = PatchParams::apply(&field_manager);
crd_api
.patch(&crd_name, &patch_params, &patch)
.await
.with_context(|_| PatchCrdSnafu { crd_name })?;
}

// After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out
// via the oneshot channel.
if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() {
ensure!(
initial_reconcile_tx.send(()).is_ok(),
SendInitialReconcileHeartbeatSnafu
);
}
}

Ok(())
}
}

// TODO (@Techassi): Make this a builder instead
/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`].
pub struct CustomResourceDefinitionMaintainerOptions {
/// The service name used by the operator/conversion webhook.
pub operator_service_name: String,

/// The namespace the operator/conversion webhook runs in.
pub operator_namespace: String,

/// The name of the field manager used for the server-side apply.
pub field_manager: String,

/// The HTTPS port the conversion webhook listens on.
pub webhook_https_port: u16,

/// Indicates if the maintainer should be disabled.
pub disabled: bool,
}
2 changes: 2 additions & 0 deletions crates/stackable-operator/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize};
pub mod authentication;
pub mod git_sync;
pub mod listener;
#[cfg(feature = "webhook")]
pub mod maintainer;
pub mod s3;

/// A reference to a product cluster (for example, a `ZookeeperCluster`)
Expand Down
15 changes: 15 additions & 0 deletions crates/stackable-webhook/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Changed

- BREAKING: The `ConversionWebhookServer` now returns a pair of values ([#1099]):
- The conversion webhook server itself
- A `mpsc::Receiver<Certificate>` to provide consumers the newly generated TLS certificate
- BREAKING: Constants for ports, IP addresses and socket addresses are now associated constants on
`(Conversion)WebhookServer` instead of free-standing ones ([#1099]).

### Removed

- BREAKING: The `maintain_crds` and `field_manager` fields in `ConversionWebhookOptions`
are removed ([#1099]).

[#1099]: https://github.com/stackabletech/operator-rs/pull/1099

## [0.6.0] - 2025-09-09

### Added
Expand Down
4 changes: 4 additions & 0 deletions crates/stackable-webhook/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ license.workspace = true
edition.workspace = true
repository.workspace = true

[features]
# Re-export selected items from the x509-cert crate needed by the CRD maintainer
maintainer = []

[dependencies]
stackable-certs = { path = "../stackable-certs", features = ["rustls"] }
stackable-shared = { path = "../stackable-shared" }
Expand Down
21 changes: 0 additions & 21 deletions crates/stackable-webhook/src/constants.rs

This file was deleted.

Loading