Skip to content

Commit 1c44459

Browse files
committed
feat: Add CRD maintainer
1 parent b10052c commit 1c44459

File tree

8 files changed

+224
-208
lines changed

8 files changed

+224
-208
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/stackable-operator/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ full = ["certs", "telemetry", "versioned", "time", "webhook", "clap"]
1212
default = ["telemetry", "versioned", "clap"]
1313

1414
clap = []
15-
certs = ["dep:stackable-certs"]
15+
certs = ["dep:stackable-certs", "dep:x509-cert"]
1616
telemetry = ["dep:stackable-telemetry"]
1717
time = ["stackable-shared/time"]
1818
versioned = ["dep:stackable-versioned"]
@@ -53,6 +53,7 @@ tracing.workspace = true
5353
tracing-appender.workspace = true
5454
tracing-subscriber.workspace = true
5555
url.workspace = true
56+
x509-cert = { workspace = true, optional = true }
5657

5758
[dev-dependencies]
5859
rstest.workspace = true
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use k8s_openapi::{
2+
ByteString,
3+
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
4+
CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
5+
WebhookConversion,
6+
},
7+
};
8+
use kube::{
9+
Api, Client, ResourceExt,
10+
api::{Patch, PatchParams},
11+
};
12+
use snafu::{ResultExt, Snafu};
13+
use tokio::sync::{mpsc, oneshot};
14+
use x509_cert::{
15+
Certificate,
16+
der::{EncodePem, pem::LineEnding},
17+
};
18+
19+
#[derive(Debug, Snafu)]
20+
pub enum Error {
21+
#[snafu(display("failed to encode CA certificate as PEM format"))]
22+
EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error },
23+
24+
#[snafu(display("failed to send initial CRD reconcile heartbeat"))]
25+
SendInitialReconcileHeartbeat,
26+
27+
#[snafu(display("failed to patch CRD {crd_name:?}"))]
28+
PatchCrd {
29+
source: kube::Error,
30+
crd_name: String,
31+
},
32+
}
33+
34+
/// Maintains various custom resource definitions.
35+
///
36+
/// When running this, the following operations are done:
37+
///
38+
/// - Apply the CRDs when starting up
39+
/// - Reconcile the CRDs when the conversion webhook certificate is rotated
40+
pub struct CustomResourceDefinitionMaintainer {
41+
client: Client,
42+
certificate_rx: mpsc::Receiver<Certificate>,
43+
44+
definitions: Vec<CustomResourceDefinition>,
45+
options: CustomResourceDefinitionMaintainerOptions,
46+
47+
initial_reconcile_tx: Option<oneshot::Sender<()>>,
48+
}
49+
50+
impl CustomResourceDefinitionMaintainer {
51+
/// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more
52+
/// custom resource definitions.
53+
pub fn new(
54+
client: Client,
55+
certificate_rx: mpsc::Receiver<Certificate>,
56+
definitions: impl IntoIterator<Item = CustomResourceDefinition>,
57+
options: CustomResourceDefinitionMaintainerOptions,
58+
) -> (Self, oneshot::Receiver<()>) {
59+
let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel();
60+
let initial_reconcile_tx = Some(initial_reconcile_tx);
61+
62+
let maintainer = Self {
63+
definitions: definitions.into_iter().collect(),
64+
initial_reconcile_tx,
65+
certificate_rx,
66+
options,
67+
client,
68+
};
69+
70+
(maintainer, initial_reconcile_rx)
71+
}
72+
73+
pub async fn run(mut self) -> Result<(), Error> {
74+
let CustomResourceDefinitionMaintainerOptions {
75+
operator_service_name,
76+
operator_namespace,
77+
field_manager,
78+
https_port,
79+
disabled,
80+
} = self.options;
81+
82+
// If the maintainer is disabled, immediately return without doing any work.
83+
if disabled {
84+
return Ok(());
85+
}
86+
87+
while let Some(certificate) = self.certificate_rx.recv().await {
88+
tracing::info!(
89+
k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(),
90+
"reconciling custom resource definitions"
91+
);
92+
93+
let ca_bundle = certificate
94+
.to_pem(LineEnding::LF)
95+
.context(EncodeCertificateAuthorityAsPemSnafu)?;
96+
97+
let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
98+
99+
for mut crd in self.definitions.iter().cloned() {
100+
let crd_kind = &crd.spec.names.kind;
101+
let crd_name = crd.name_any();
102+
103+
tracing::debug!(
104+
k8s.crd.kind = crd_kind,
105+
k8s.crd.name = crd_name,
106+
"reconciling custom resource definition"
107+
);
108+
109+
crd.spec.conversion = Some(CustomResourceConversion {
110+
strategy: "Webhook".to_owned(),
111+
webhook: Some(WebhookConversion {
112+
// conversionReviewVersions indicates what ConversionReview versions are understood/preferred by the webhook.
113+
// The first version in the list understood by the API server is sent to the webhook.
114+
// The webhook must respond with a ConversionReview object in the same version it received.
115+
conversion_review_versions: vec!["v1".to_owned()],
116+
client_config: Some(WebhookClientConfig {
117+
service: Some(ServiceReference {
118+
name: operator_service_name.clone(),
119+
namespace: operator_namespace.clone(),
120+
path: Some(format!("/convert/{crd_name}")),
121+
port: Some(https_port.into()),
122+
}),
123+
ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())),
124+
url: None,
125+
}),
126+
}),
127+
});
128+
129+
let patch = Patch::Apply(&crd);
130+
let patch_params = PatchParams::apply(&field_manager);
131+
crd_api
132+
.patch(&crd_name, &patch_params, &patch)
133+
.await
134+
.with_context(|_| PatchCrdSnafu { crd_name })?;
135+
}
136+
137+
// Once all CRDs are reconciled, send a heartbeat for consumers to be notified that
138+
// custom resources of these kinds can bow be deployed.
139+
if let Some(initial_reconcile_tx) = self.initial_reconcile_tx.take() {
140+
initial_reconcile_tx
141+
.send(())
142+
.ignore_context(SendInitialReconcileHeartbeatSnafu)?
143+
}
144+
}
145+
146+
Ok(())
147+
}
148+
}
149+
150+
// TODO (@Techassi): Make this a builder instead
151+
pub struct CustomResourceDefinitionMaintainerOptions {
152+
operator_service_name: String,
153+
operator_namespace: String,
154+
field_manager: String,
155+
https_port: u16,
156+
disabled: bool,
157+
}
158+
159+
trait ResultContextExt<T> {
160+
fn ignore_context<C, E2>(self, context: C) -> Result<T, E2>
161+
where
162+
C: snafu::IntoError<E2, Source = snafu::NoneError>,
163+
E2: std::error::Error + snafu::ErrorCompat;
164+
}
165+
166+
impl<T, E> ResultContextExt<T> for Result<T, E> {
167+
fn ignore_context<C, E2>(self, context: C) -> Result<T, E2>
168+
where
169+
C: snafu::IntoError<E2, Source = snafu::NoneError>,
170+
E2: std::error::Error + snafu::ErrorCompat,
171+
{
172+
match self {
173+
Ok(v) => Ok(v),
174+
Err(_) => Err(context.into_error(snafu::NoneError)),
175+
}
176+
}
177+
}

crates/stackable-operator/src/crd/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize};
77
pub mod authentication;
88
pub mod git_sync;
99
pub mod listener;
10+
#[cfg(feature = "certs")]
11+
pub mod maintainer;
1012
pub mod s3;
1113

1214
/// A reference to a product cluster (for example, a `ZookeeperCluster`)

crates/stackable-webhook/src/constants.rs

Lines changed: 0 additions & 21 deletions
This file was deleted.

crates/stackable-webhook/src/lib.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
//! enable complete control over these details if needed.
2727
//!
2828
//! [1]: crate::servers::ConversionWebhookServer
29+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
30+
2931
use axum::{Router, routing::get};
3032
use futures_util::{FutureExt as _, pin_mut, select};
3133
use snafu::{ResultExt, Snafu};
@@ -40,7 +42,6 @@ use x509_cert::Certificate;
4042
// use tower_http::trace::TraceLayer;
4143
use crate::tls::TlsServer;
4244

43-
pub mod constants;
4445
pub mod options;
4546
pub mod servers;
4647
pub mod tls;
@@ -86,6 +87,15 @@ pub struct WebhookServer {
8687
}
8788

8889
impl WebhookServer {
90+
/// The default HTTPS port `8443`
91+
pub const DEFAULT_HTTPS_PORT: u16 = 8443;
92+
/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to,
93+
/// which represents binding on all network addresses.
94+
pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95+
/// The default socket address `0.0.0.0:8443` the webhook server binds to.
96+
pub const DEFAULT_SOCKET_ADDRESS: SocketAddr =
97+
SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT);
98+
8999
/// Creates a new ready-to-use webhook server.
90100
///
91101
/// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles

crates/stackable-webhook/src/options.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use stackable_certs::PrivateKeyType;
88

9-
use crate::constants::DEFAULT_SOCKET_ADDRESS;
9+
use crate::WebhookServer;
1010

1111
/// Specifies available webhook server options.
1212
///
@@ -82,15 +82,19 @@ impl WebhookOptionsBuilder {
8282
/// Sets the IP address of the socket address the webhook server uses to
8383
/// bind for HTTPS.
8484
pub fn bind_ip(mut self, bind_ip: impl Into<IpAddr>) -> Self {
85-
let addr = self.socket_addr.get_or_insert(DEFAULT_SOCKET_ADDRESS);
85+
let addr = self
86+
.socket_addr
87+
.get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS);
8688
addr.set_ip(bind_ip.into());
8789
self
8890
}
8991

9092
/// Sets the port of the socket address the webhook server uses to bind
9193
/// for HTTPS.
9294
pub fn bind_port(mut self, bind_port: u16) -> Self {
93-
let addr = self.socket_addr.get_or_insert(DEFAULT_SOCKET_ADDRESS);
95+
let addr = self
96+
.socket_addr
97+
.get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS);
9498
addr.set_port(bind_port);
9599
self
96100
}
@@ -119,7 +123,9 @@ impl WebhookOptionsBuilder {
119123
/// explicitly set option.
120124
pub fn build(self) -> WebhookOptions {
121125
WebhookOptions {
122-
socket_addr: self.socket_addr.unwrap_or(DEFAULT_SOCKET_ADDRESS),
126+
socket_addr: self
127+
.socket_addr
128+
.unwrap_or(WebhookServer::DEFAULT_SOCKET_ADDRESS),
123129
subject_alterative_dns_names: self.subject_alterative_dns_names,
124130
}
125131
}

0 commit comments

Comments
 (0)