-
-
Notifications
You must be signed in to change notification settings - Fork 16
feat!: Add CRD maintainer #1099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Techassi
wants to merge
18
commits into
main
Choose a base branch
from
feat/crd-maintenance
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
1c44459
feat: Add CRD maintainer
Techassi 41f8649
docs: Add/improve various (doc) comments
Techassi 9aa8941
fix: Use default crypto provider for TLS server
Techassi 6d0916c
chore: Merge branch 'main' into feat/crd-maintenance
Techassi 362eb48
chore(stackable-operator): Gate maintainer behind webhook feature
Techassi c8d0621
fix(stackable-operator): Make options fields public
Techassi 2fc5c28
feat(stackable-operator): Add create_if_missing method to client
Techassi 59a9426
chore(webhook): Add changelog entry
Techassi 77cb05b
chore(operator): Add changelog entry
Techassi 106dd16
chore: Streamline feature gate
Techassi bbf49f5
docs(operator): Add example in doc comment
Techassi ac5b9cb
refactor(operator): Adjust oneshot channel handling
Techassi d51491e
chore: Apply suggestions
Techassi d0c4a57
chore(webhook): Update dev comment
Techassi f1b4ed0
chore: Merge branch 'main' into feat/crd-maintenance
Techassi 44a701a
fix(operator): Import ensure! macro
Techassi 434c84a
chore: Merge branch 'main' into feat/crd-maintenance
Techassi 99eae00
feat: Add ConversionWebhookServer::with_maintainer
Techassi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
use k8s_openapi::{ | ||
ByteString, | ||
apiextensions_apiserver::pkg::apis::apiextensions::v1::{ | ||
CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, | ||
WebhookConversion, | ||
}, | ||
}; | ||
use kube::{ | ||
Api, Client, ResourceExt, | ||
api::{Patch, PatchParams}, | ||
}; | ||
use snafu::{ResultExt, Snafu, ensure}; | ||
use stackable_webhook::x509_cert::{self, Certificate, EncodePem, LineEnding}; | ||
use tokio::sync::{mpsc, oneshot}; | ||
|
||
#[derive(Debug, Snafu)] | ||
pub enum Error { | ||
#[snafu(display("failed to encode CA certificate as PEM format"))] | ||
EncodeCertificateAuthorityAsPem { source: x509_cert::Error }, | ||
|
||
#[snafu(display("failed to send initial CRD reconcile heartbeat"))] | ||
SendInitialReconcileHeartbeat, | ||
|
||
#[snafu(display("failed to patch CRD {crd_name:?}"))] | ||
PatchCrd { | ||
source: kube::Error, | ||
crd_name: String, | ||
}, | ||
} | ||
|
||
/// Maintains various custom resource definitions. | ||
/// | ||
/// When running this, the following operations are done: | ||
/// | ||
/// - Apply the CRDs when starting up | ||
/// - Reconcile the CRDs when the conversion webhook certificate is rotated | ||
pub struct CustomResourceDefinitionMaintainer { | ||
client: Client, | ||
certificate_rx: mpsc::Receiver<Certificate>, | ||
|
||
definitions: Vec<CustomResourceDefinition>, | ||
options: CustomResourceDefinitionMaintainerOptions, | ||
|
||
initial_reconcile_tx: oneshot::Sender<()>, | ||
} | ||
|
||
impl CustomResourceDefinitionMaintainer { | ||
/// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more | ||
/// custom resource definitions. | ||
/// | ||
/// ## Parameters | ||
/// | ||
/// This function expects four parameters: | ||
/// | ||
/// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches | ||
/// the CRDs when the TLS certificate is rotated. | ||
/// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The | ||
/// certificate data sent through the channel is used to set the caBundle in the conversion | ||
/// section of the CRD. | ||
/// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained | ||
/// by this maintainer. If the iterator is empty, the maintainer returns early without doing | ||
/// any work. As such, a polling mechanism which waits for all futures should be used to | ||
/// prevent premature termination of the operator. | ||
/// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various | ||
/// parts of the maintainer. In the future, this will be converted to a builder, to enable a | ||
/// cleaner API interface. | ||
/// | ||
/// ## Return Values | ||
/// | ||
/// This function returns a 2-tuple (pair) of values: | ||
/// | ||
/// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer. | ||
/// See [`CustomResourceDefinitionMaintainer::run`] for more details. | ||
/// - The [`oneshot::Receiver`] which will be used to send out a message once the initial | ||
/// CRD reconciliation ran. This signal can be used to trigger the deployment of custom | ||
/// resources defined by the maintained CRDs. | ||
/// | ||
/// ## Example | ||
/// | ||
/// ```no_run | ||
/// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion}; | ||
/// # use stackable_webhook::x509_cert::Certificate; | ||
/// # use tokio::sync::mpsc::channel; | ||
/// # use kube::Client; | ||
/// use stackable_operator::crd::maintainer::{ | ||
/// CustomResourceDefinitionMaintainerOptions, | ||
/// CustomResourceDefinitionMaintainer, | ||
/// }; | ||
/// | ||
/// # #[tokio::main] | ||
/// # async fn main() { | ||
/// # let (certificate_tx, certificate_rx) = channel(1); | ||
/// let options = CustomResourceDefinitionMaintainerOptions { | ||
/// operator_service_name: "my-service-name".to_owned(), | ||
/// operator_namespace: "my-namespace".to_owned(), | ||
/// field_manager: "my-operator".to_owned(), | ||
/// webhook_https_port: 8443, | ||
/// disabled: true, | ||
/// }; | ||
/// | ||
/// let client = Client::try_default().await.unwrap(); | ||
/// | ||
/// let definitions = vec![ | ||
/// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(), | ||
/// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(), | ||
/// ]; | ||
/// | ||
/// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( | ||
/// client, | ||
/// certificate_rx, | ||
/// definitions, | ||
/// options, | ||
/// ); | ||
/// # } | ||
/// ``` | ||
pub fn new( | ||
client: Client, | ||
certificate_rx: mpsc::Receiver<Certificate>, | ||
definitions: impl IntoIterator<Item = CustomResourceDefinition>, | ||
options: CustomResourceDefinitionMaintainerOptions, | ||
) -> (Self, oneshot::Receiver<()>) { | ||
let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); | ||
|
||
let maintainer = Self { | ||
definitions: definitions.into_iter().collect(), | ||
initial_reconcile_tx, | ||
certificate_rx, | ||
options, | ||
client, | ||
}; | ||
|
||
(maintainer, initial_reconcile_rx) | ||
} | ||
|
||
/// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously. | ||
/// | ||
/// This needs to be polled in parallel with other parts of an operator, like controllers or | ||
/// webhook servers. If it is disabled, the returned future immediately resolves to | ||
/// [`std::task::Poll::Ready`] and thus doesn't consume any resources. | ||
pub async fn run(mut self) -> Result<(), Error> { | ||
let CustomResourceDefinitionMaintainerOptions { | ||
operator_service_name, | ||
operator_namespace, | ||
field_manager, | ||
webhook_https_port: https_port, | ||
disabled, | ||
} = self.options; | ||
|
||
// If the maintainer is disabled or there are no custom resource definitions, immediately | ||
// return without doing any work. | ||
if disabled || self.definitions.is_empty() { | ||
return Ok(()); | ||
} | ||
|
||
// This channel can only be used exactly once. The sender's send method consumes self, and | ||
// as such, the sender is wrapped in an Option to be able to call take to consume the inner | ||
// value. | ||
let mut initial_reconcile_tx = Some(self.initial_reconcile_tx); | ||
|
||
// This get's polled by the async runtime on a regular basis (or when woken up). Once we | ||
// receive a message containing the newly generated TLS certificate for the conversion | ||
// webhook, we need to update the caBundle in the CRD. | ||
while let Some(certificate) = self.certificate_rx.recv().await { | ||
tracing::info!( | ||
k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(), | ||
"reconciling custom resource definitions" | ||
); | ||
|
||
// The caBundle needs to be provided as a base64-encoded PEM envelope. | ||
let ca_bundle = certificate | ||
.to_pem(LineEnding::LF) | ||
.context(EncodeCertificateAuthorityAsPemSnafu)?; | ||
|
||
let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone()); | ||
|
||
for mut crd in self.definitions.iter().cloned() { | ||
let crd_kind = &crd.spec.names.kind; | ||
let crd_name = crd.name_any(); | ||
|
||
tracing::debug!( | ||
k8s.crd.kind = crd_kind, | ||
k8s.crd.name = crd_name, | ||
"reconciling custom resource definition" | ||
); | ||
|
||
crd.spec.conversion = Some(CustomResourceConversion { | ||
strategy: "Webhook".to_owned(), | ||
webhook: Some(WebhookConversion { | ||
// conversionReviewVersions indicates what ConversionReview versions are | ||
// supported by the webhook. The first version in the list understood by the | ||
// API server is sent to the webhook. The webhook must respond with a | ||
// ConversionReview object in the same version it received. We only support | ||
// the stable v1 ConversionReview to keep the implementation as simple as | ||
// possible. | ||
conversion_review_versions: vec!["v1".to_owned()], | ||
client_config: Some(WebhookClientConfig { | ||
service: Some(ServiceReference { | ||
name: operator_service_name.clone(), | ||
namespace: operator_namespace.clone(), | ||
path: Some(format!("/convert/{crd_name}")), | ||
port: Some(https_port.into()), | ||
}), | ||
// Here, ByteString takes care of encoding the provided content as | ||
// base64. | ||
ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), | ||
url: None, | ||
}), | ||
}), | ||
}); | ||
|
||
// Deploy the updated CRDs using a server-side apply. | ||
let patch = Patch::Apply(&crd); | ||
let patch_params = PatchParams::apply(&field_manager); | ||
crd_api | ||
.patch(&crd_name, &patch_params, &patch) | ||
.await | ||
.with_context(|_| PatchCrdSnafu { crd_name })?; | ||
} | ||
|
||
// After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out | ||
// via the oneshot channel. | ||
if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() { | ||
ensure!( | ||
initial_reconcile_tx.send(()).is_ok(), | ||
SendInitialReconcileHeartbeatSnafu | ||
); | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
// TODO (@Techassi): Make this a builder instead | ||
/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`]. | ||
pub struct CustomResourceDefinitionMaintainerOptions { | ||
/// The service name used by the operator/conversion webhook. | ||
pub operator_service_name: String, | ||
|
||
/// The namespace the operator/conversion webhook runs in. | ||
pub operator_namespace: String, | ||
|
||
/// The name of the field manager used for the server-side apply. | ||
pub field_manager: String, | ||
|
||
/// The HTTPS port the conversion webhook listens on. | ||
pub webhook_https_port: u16, | ||
|
||
/// Indicates if the maintainer should be disabled. | ||
pub disabled: bool, | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This got removed in 99eae00.