Skip to content

Commit 99eae00

Browse files
committed
feat: Add ConversionWebhookServer::with_maintainer
1 parent 434c84a commit 99eae00

File tree

8 files changed

+113
-64
lines changed

8 files changed

+113
-64
lines changed

crates/stackable-operator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ stackable-operator-derive = { path = "../stackable-operator-derive" }
2424
stackable-shared = { path = "../stackable-shared", features = ["chrono", "time"] }
2525
stackable-telemetry = { path = "../stackable-telemetry", optional = true, features = ["clap"] }
2626
stackable-versioned = { path = "../stackable-versioned", optional = true }
27-
stackable-webhook = { path = "../stackable-webhook", optional = true, features = ["maintainer"]}
27+
stackable-webhook = { path = "../stackable-webhook", optional = true}
2828

2929
chrono.workspace = true
3030
clap.workspace = true

crates/stackable-operator/src/crd/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ use serde::{Deserialize, Serialize};
77
pub mod authentication;
88
pub mod git_sync;
99
pub mod listener;
10-
#[cfg(feature = "webhook")]
11-
pub mod maintainer;
1210
pub mod s3;
1311

1412
/// A reference to a product cluster (for example, a `ZookeeperCluster`)

crates/stackable-webhook/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@ license.workspace = true
66
edition.workspace = true
77
repository.workspace = true
88

9-
[features]
10-
# Re-export selected items from the x509-cert crate needed by the CRD maintainer
11-
maintainer = []
12-
139
[dependencies]
1410
stackable-certs = { path = "../stackable-certs", features = ["rustls"] }
1511
stackable-shared = { path = "../stackable-shared" }

crates/stackable-webhook/src/lib.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,11 @@ use tower::ServiceBuilder;
4343
pub use crate::options::WebhookOptions;
4444
use crate::tls::TlsServer;
4545

46+
pub mod maintainer;
4647
pub mod options;
4748
pub mod servers;
4849
pub mod tls;
4950

50-
#[cfg(feature = "maintainer")]
51-
pub mod x509_cert {
52-
pub use ::x509_cert::{
53-
Certificate,
54-
der::{EncodePem, Error, pem::LineEnding},
55-
};
56-
}
57-
5851
/// A generic webhook handler receiving a request and sending back a response.
5952
///
6053
/// This trait is not intended to be implemented by external crates and this

crates/stackable-operator/src/crd/maintainer.rs renamed to crates/stackable-webhook/src/maintainer.rs

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@ use kube::{
1010
api::{Patch, PatchParams},
1111
};
1212
use snafu::{ResultExt, Snafu, ensure};
13-
use stackable_webhook::x509_cert::{self, Certificate, EncodePem, LineEnding};
1413
use tokio::sync::{mpsc, oneshot};
14+
use x509_cert::{
15+
Certificate,
16+
der::{EncodePem, pem::LineEnding},
17+
};
1518

1619
#[derive(Debug, Snafu)]
1720
pub enum Error {
1821
#[snafu(display("failed to encode CA certificate as PEM format"))]
19-
EncodeCertificateAuthorityAsPem { source: x509_cert::Error },
22+
EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error },
2023

2124
#[snafu(display("failed to send initial CRD reconcile heartbeat"))]
2225
SendInitialReconcileHeartbeat,
@@ -34,17 +37,17 @@ pub enum Error {
3437
///
3538
/// - Apply the CRDs when starting up
3639
/// - Reconcile the CRDs when the conversion webhook certificate is rotated
37-
pub struct CustomResourceDefinitionMaintainer {
40+
pub struct CustomResourceDefinitionMaintainer<'a> {
3841
client: Client,
3942
certificate_rx: mpsc::Receiver<Certificate>,
4043

4144
definitions: Vec<CustomResourceDefinition>,
42-
options: CustomResourceDefinitionMaintainerOptions,
45+
options: CustomResourceDefinitionMaintainerOptions<'a>,
4346

4447
initial_reconcile_tx: oneshot::Sender<()>,
4548
}
4649

47-
impl CustomResourceDefinitionMaintainer {
50+
impl<'a> CustomResourceDefinitionMaintainer<'a> {
4851
/// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more
4952
/// custom resource definitions.
5053
///
@@ -79,10 +82,10 @@ impl CustomResourceDefinitionMaintainer {
7982
///
8083
/// ```no_run
8184
/// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion};
82-
/// # use stackable_webhook::x509_cert::Certificate;
8385
/// # use tokio::sync::mpsc::channel;
86+
/// # use x509_cert::Certificate;
8487
/// # use kube::Client;
85-
/// use stackable_operator::crd::maintainer::{
88+
/// use stackable_webhook::maintainer::{
8689
/// CustomResourceDefinitionMaintainerOptions,
8790
/// CustomResourceDefinitionMaintainer,
8891
/// };
@@ -91,9 +94,8 @@ impl CustomResourceDefinitionMaintainer {
9194
/// # async fn main() {
9295
/// # let (certificate_tx, certificate_rx) = channel(1);
9396
/// let options = CustomResourceDefinitionMaintainerOptions {
94-
/// operator_service_name: "my-service-name".to_owned(),
95-
/// operator_namespace: "my-namespace".to_owned(),
96-
/// field_manager: "my-operator".to_owned(),
97+
/// operator_name: "my-service-name",
98+
/// operator_namespace: "my-namespace",
9799
/// webhook_https_port: 8443,
98100
/// disabled: true,
99101
/// };
@@ -117,7 +119,7 @@ impl CustomResourceDefinitionMaintainer {
117119
client: Client,
118120
certificate_rx: mpsc::Receiver<Certificate>,
119121
definitions: impl IntoIterator<Item = CustomResourceDefinition>,
120-
options: CustomResourceDefinitionMaintainerOptions,
122+
options: CustomResourceDefinitionMaintainerOptions<'a>,
121123
) -> (Self, oneshot::Receiver<()>) {
122124
let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel();
123125

@@ -139,10 +141,9 @@ impl CustomResourceDefinitionMaintainer {
139141
/// [`std::task::Poll::Ready`] and thus doesn't consume any resources.
140142
pub async fn run(mut self) -> Result<(), Error> {
141143
let CustomResourceDefinitionMaintainerOptions {
142-
operator_service_name,
143144
operator_namespace,
144-
field_manager,
145-
webhook_https_port: https_port,
145+
webhook_https_port,
146+
operator_name,
146147
disabled,
147148
} = self.options;
148149

@@ -173,7 +174,7 @@ impl CustomResourceDefinitionMaintainer {
173174

174175
let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
175176

176-
for mut crd in self.definitions.iter().cloned() {
177+
for crd in self.definitions.iter_mut() {
177178
let crd_kind = &crd.spec.names.kind;
178179
let crd_name = crd.name_any();
179180

@@ -195,10 +196,10 @@ impl CustomResourceDefinitionMaintainer {
195196
conversion_review_versions: vec!["v1".to_owned()],
196197
client_config: Some(WebhookClientConfig {
197198
service: Some(ServiceReference {
198-
name: operator_service_name.clone(),
199-
namespace: operator_namespace.clone(),
199+
name: operator_name.to_owned(),
200+
namespace: operator_namespace.to_owned(),
200201
path: Some(format!("/convert/{crd_name}")),
201-
port: Some(https_port.into()),
202+
port: Some(webhook_https_port.into()),
202203
}),
203204
// Here, ByteString takes care of encoding the provided content as
204205
// base64.
@@ -210,7 +211,7 @@ impl CustomResourceDefinitionMaintainer {
210211

211212
// Deploy the updated CRDs using a server-side apply.
212213
let patch = Patch::Apply(&crd);
213-
let patch_params = PatchParams::apply(&field_manager);
214+
let patch_params = PatchParams::apply(operator_name);
214215
crd_api
215216
.patch(&crd_name, &patch_params, &patch)
216217
.await
@@ -233,15 +234,12 @@ impl CustomResourceDefinitionMaintainer {
233234

234235
// TODO (@Techassi): Make this a builder instead
235236
/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`].
236-
pub struct CustomResourceDefinitionMaintainerOptions {
237-
/// The service name used by the operator/conversion webhook.
238-
pub operator_service_name: String,
237+
pub struct CustomResourceDefinitionMaintainerOptions<'a> {
238+
/// The service name used by the operator/conversion webhook and as a field manager.
239+
pub operator_name: &'a str,
239240

240241
/// The namespace the operator/conversion webhook runs in.
241-
pub operator_namespace: String,
242-
243-
/// The name of the field manager used for the server-side apply.
244-
pub field_manager: String,
242+
pub operator_namespace: &'a str,
245243

246244
/// The HTTPS port the conversion webhook listens on.
247245
pub webhook_https_port: u16,

crates/stackable-webhook/src/servers/conversion.rs

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@ use std::{fmt::Debug, net::SocketAddr};
22

33
use axum::{Json, Router, routing::post};
44
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
5-
use kube::ResourceExt;
65
// Re-export this type because users of the conversion webhook server require
76
// this type to write the handler function. Instead of importing this type from
87
// kube directly, consumers can use this type instead. This also eliminates
98
// keeping the kube dependency version in sync between here and the operator.
109
pub use kube::core::conversion::ConversionReview;
10+
use kube::{Client, ResourceExt};
1111
use snafu::{ResultExt, Snafu};
12-
use tokio::sync::mpsc;
12+
use tokio::sync::{mpsc, oneshot};
1313
use tracing::instrument;
1414
use x509_cert::Certificate;
1515

16-
use crate::{WebhookError, WebhookHandler, WebhookServer, options::WebhookOptions};
16+
use crate::{
17+
WebhookError, WebhookHandler, WebhookServer,
18+
maintainer::{CustomResourceDefinitionMaintainer, CustomResourceDefinitionMaintainerOptions},
19+
options::WebhookOptions,
20+
};
1721

1822
#[derive(Debug, Snafu)]
1923
pub enum ConversionWebhookError {
@@ -53,15 +57,15 @@ where
5357

5458
// TODO: Add a builder, maybe with `bon`.
5559
#[derive(Debug)]
56-
pub struct ConversionWebhookOptions {
60+
pub struct ConversionWebhookOptions<'a> {
5761
/// The bind address to bind the HTTPS server to.
5862
pub socket_addr: SocketAddr,
5963

6064
/// The namespace the operator/webhook is running in.
61-
pub namespace: String,
65+
pub namespace: &'a str,
6266

6367
/// The name of the Kubernetes service which points to the operator/webhook.
64-
pub service_name: String,
68+
pub service_name: &'a str,
6569
}
6670

6771
/// A ready-to-use CRD conversion webhook server.
@@ -77,6 +81,8 @@ impl ConversionWebhookServer {
7781
/// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being
7882
/// made to the `/convert/{CRD_NAME}` endpoint.
7983
///
84+
/// The TLS certificate is automatically generated and rotated.
85+
///
8086
/// ## Parameters
8187
///
8288
/// This function expects the following parameters:
@@ -119,8 +125,8 @@ impl ConversionWebhookServer {
119125
///
120126
/// let options = ConversionWebhookOptions {
121127
/// socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS,
122-
/// namespace: "stackable-operators".to_owned(),
123-
/// service_name: "product-operator".to_owned(),
128+
/// namespace: "stackable-operators",
129+
/// service_name: "product-operator",
124130
/// };
125131
///
126132
/// let (conversion_webhook_server, _certificate_rx) =
@@ -134,25 +140,25 @@ impl ConversionWebhookServer {
134140
#[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))]
135141
pub async fn new<H>(
136142
crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)>,
137-
options: ConversionWebhookOptions,
143+
options: ConversionWebhookOptions<'_>,
138144
) -> Result<(Self, mpsc::Receiver<Certificate>), ConversionWebhookError>
139145
where
140146
H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
141147
{
142148
tracing::debug!("create new conversion webhook server");
143149

144150
let mut router = Router::new();
145-
let mut crds = Vec::new();
151+
146152
for (crd, handler) in crds_and_handlers {
147153
let crd_name = crd.name_any();
148154
let handler_fn = |Json(review): Json<ConversionReview>| async {
149155
let review = handler.call(review);
150156
Json(review)
151157
};
152158

159+
// TODO (@Techassi): Make this part of the trait mentioned above
153160
let route = format!("/convert/{crd_name}");
154161
router = router.route(&route, post(handler_fn));
155-
crds.push(crd);
156162
}
157163

158164
let ConversionWebhookOptions {
@@ -180,6 +186,61 @@ impl ConversionWebhookServer {
180186
Ok((Self(server), certificate_rx))
181187
}
182188

189+
/// Creates and returns a tuple consisting of a [`ConversionWebhookServer`], a [`CustomResourceDefinitionMaintainer`],
190+
/// and a [`oneshot::Receiver`].
191+
///
192+
/// See the referenced items for more details on usage.
193+
pub async fn with_maintainer<'a, H>(
194+
// TODO (@Techassi): Use a trait type here which can be used to build all part of the
195+
// conversion webhook server and a CRD maintainer.
196+
crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)> + Clone,
197+
operator_name: &'a str,
198+
operator_namespace: &'a str,
199+
disable_maintainer: bool,
200+
client: Client,
201+
) -> Result<
202+
(
203+
Self,
204+
CustomResourceDefinitionMaintainer<'a>,
205+
oneshot::Receiver<()>,
206+
),
207+
ConversionWebhookError,
208+
>
209+
where
210+
H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
211+
{
212+
let socket_addr = ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS;
213+
214+
// TODO (@Techassi): These should be moved into a builder
215+
let webhook_options = ConversionWebhookOptions {
216+
namespace: operator_namespace,
217+
service_name: operator_name,
218+
socket_addr,
219+
};
220+
221+
let (conversion_webhook_server, certificate_rx) =
222+
Self::new(crds_and_handlers.clone(), webhook_options).await?;
223+
224+
let definitions = crds_and_handlers.into_iter().map(|(crd, _)| crd);
225+
226+
// TODO (@Techassi): These should be moved into a builder
227+
let maintainer_options = CustomResourceDefinitionMaintainerOptions {
228+
webhook_https_port: socket_addr.port(),
229+
disabled: disable_maintainer,
230+
operator_namespace,
231+
operator_name,
232+
};
233+
234+
let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new(
235+
client,
236+
certificate_rx,
237+
definitions,
238+
maintainer_options,
239+
);
240+
241+
Ok((conversion_webhook_server, maintainer, initial_reconcile_rx))
242+
}
243+
183244
/// Runs the [`ConversionWebhookServer`] asynchronously.
184245
pub async fn run(self) -> Result<(), ConversionWebhookError> {
185246
tracing::info!("run conversion webhook server");

0 commit comments

Comments
 (0)