Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
15ba09d
Add a simple controller example
mateiidavid Jan 31, 2024
fc2fc42
Merge branch 'main' of github.com:kube-rs/kube into matei/arc-watcher
mateiidavid Jan 31, 2024
1657ddc
Add shared stream controller example
mateiidavid Feb 23, 2024
98255dc
Try to get something working
mateiidavid Feb 23, 2024
3685b9e
Rm my notes
mateiidavid Feb 23, 2024
683e77d
Results or objectefs
mateiidavid Feb 29, 2024
af7a309
Working shared stream
mateiidavid Feb 29, 2024
8d4d694
Different way of doing it
mateiidavid Feb 29, 2024
8534770
Switch to async_broadcast
mateiidavid Mar 2, 2024
9bbe8e1
Remove old, unused code
mateiidavid Mar 2, 2024
3f874ce
Remove unused examples
mateiidavid Mar 2, 2024
1e1e347
Gotta state machine this stuff
mateiidavid Mar 7, 2024
15f6e1d
Take 1 with try_recv
mateiidavid Mar 8, 2024
49eaf12
try_recv take 2
mateiidavid Mar 8, 2024
e7aad76
Working on names next
mateiidavid Mar 11, 2024
b6ff97f
Ok surprising this worked
mateiidavid Mar 13, 2024
7a570fd
Write tests and rename file to reflect dispatch
mateiidavid Mar 25, 2024
0256cb0
WIP
mateiidavid Mar 25, 2024
74f09f7
WIP 2
mateiidavid Mar 26, 2024
2d5a3b0
Start working on store side
mateiidavid Mar 26, 2024
0cb816b
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Mar 26, 2024
9bf111c
Tests are green
mateiidavid Mar 26, 2024
04a53d1
rm redundant trait bounds
mateiidavid Mar 26, 2024
6b5bd31
Update example with new interfaces
mateiidavid Mar 26, 2024
def0011
Add comments and a small todo
mateiidavid Mar 27, 2024
d69213a
Remove dispatch mod from utils
mateiidavid Mar 27, 2024
21dbbae
@clux's feedback
mateiidavid Apr 3, 2024
c7fc333
@clux's feedback
mateiidavid Apr 3, 2024
1b81f4c
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 3, 2024
c6d1027
Fix tests & clippy warns
mateiidavid Apr 3, 2024
a30f2e6
Run fmt
mateiidavid Apr 3, 2024
fef5d83
Update examples/shared_stream_controllers.rs
mateiidavid Apr 8, 2024
44c441e
@clux's feedback on examples
mateiidavid Apr 11, 2024
8347103
Fix name in ns
mateiidavid Apr 15, 2024
9f7edd1
Add comments and feature flags
mateiidavid Apr 15, 2024
e2399f1
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 16, 2024
a14d6b4
Fix CI checks
mateiidavid Apr 16, 2024
de2eda1
Run rustfmt
mateiidavid Apr 16, 2024
276b75e
@clux's feedback
mateiidavid Apr 17, 2024
eca6be1
Run fmt
mateiidavid Apr 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ crossterm = "0.27.0"
name = "configmapgen_controller"
path = "configmapgen_controller.rs"

[[example]]
name = "shared_stream_controllers"
path = "shared_stream_controllers.rs"

[[example]]
name = "crd_api"
path = "crd_api.rs"
Expand Down
260 changes: 260 additions & 0 deletions examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus};
use kube::{
api::{Patch, PatchParams},
core::ObjectMeta,
runtime::{controller::Action, reflector::store::Writer, watcher, Config, Controller, WatchStreamExt},
Api, Client, ResourceExt,
};
use tracing::{info, info_span, warn, Instrument};

use thiserror::Error;

pub mod condition {
pub static UNDOCUMENTED_TYPE: &str = "UndocumentedPort";
pub static STATUS_TRUE: &str = "True";
}

const SUBSCRIBE_BUFFER_SIZE: usize = 256;

#[derive(Clone)]
struct Data {
client: Client,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let client = Client::try_default().await?;
let pods = Api::<Pod>::namespaced(client.clone(), "default");
let config = Config::default().concurrency(2);
let ctx = Arc::new(Data { client });

// (1): create a store (with a dispatcher)
let writer = Writer::<Pod>::new_with_dispatch(Default::default(), SUBSCRIBE_BUFFER_SIZE);
// (2): create a subscriber
let subscriber = writer.subscribe();
// (2.5): create a watch stream
let pod_watch = watcher(pods.clone(), Default::default())
.default_backoff()
.reflect_dispatch(writer);

// (3): schedule the root (i.e. shared) stream with the runtime.
//
// The runtime (tokio) will drive this task to readiness; the stream is
// polled continously and allows all downstream readers (i.e. subscribers)
// to make progress.
tokio::spawn(async move {
// Pin on the heap so we don't overflow our stack
let mut watch = pod_watch.boxed();
while let Some(next) = watch.next().await {
// We are not interested in the returned events here, only in
// handling errors.
match next {
Err(error) => tracing::error!(%error, "Received error from main watcher stream"),
_ => {}
}
}
});

// (4): create a reader. We create a metadata controller that will mirror a
// pod's labels as annotations.
//
// To create a controller that operates on a shared stream, we need two
// handles:
// - A handle to the store.
// - A handle to a shared stream.
//
// The handle to the shared stream will be used to receive shared objects as
// they are applied by the reflector.
let reader = subscriber.reader();
// Store readers can be created on-demand by calling `reader()` on a shared
// stream handle. Stream handles are cheap to clone.
let metadata_controller = Controller::for_shared_stream(subscriber.clone(), reader)
.with_config(config.clone())
.shutdown_on_signal()
.run(
reconcile_metadata,
|_, _, _| Action::requeue(Duration::from_secs(1)),
ctx.clone(),
)
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile object"),
}
})
.instrument(info_span!("metadata_controller"));

// (5): Create status controller. Our status controller write a condition
// whenever a pod has undocumented container ports (i.e. containers with no
// exposed ports).
//
// This is the last controller we will create, so we can just move the
// handle inside the controller.
let reader = subscriber.reader();
let status_controller = Controller::for_shared_stream(subscriber, reader)
.with_config(config)
.shutdown_on_signal()
.run(
reconcile_status,
|_, _, _| Action::requeue(Duration::from_secs(1)),
ctx,
)
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile object"),
}
})
.instrument(info_span!("status_controller"));

let mut terminate = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
// (6): Last step, drive controllers to readiness. Controllers are futures
// and need to be driven to make progress. A controller that's not driven
// and operates on a subscribed stream will eventually block the shared stream.
tokio::select! {
_ = metadata_controller => {
},

_ = status_controller => {
},

_ = terminate.recv() => {
info!("Received term signal; shutting down...")
}

}

Ok(())
}

#[derive(Debug, Error)]
enum Error {
#[error("Failed to patch pod: {0}")]
WriteFailed(#[source] kube::Error),

#[error("Missing po field: {0}")]
MissingField(&'static str),
}

/// Controller will trigger this whenever our main pod has changed. The function
/// reconciles a pod by copying over the labels to the annotations
async fn reconcile_metadata(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
if pod.name_any() == "kube-system" {
return Ok(Action::requeue(Duration::from_secs(300)));
}
let labels = pod.metadata.labels.clone().unwrap_or_else(|| Default::default());
if labels.len() == 0 {
return Ok(Action::requeue(Duration::from_secs(180)));
}

let annotations = labels.clone();
let p = Pod {
metadata: ObjectMeta {
name: Some(pod.name_any()),
labels: Some(labels),
annotations: Some(annotations),
..ObjectMeta::default()
},
spec: pod.spec.clone(),
status: pod.status.clone(),
};

let pod_api = Api::<Pod>::namespaced(
ctx.client.clone(),
pod.metadata
.namespace
.as_ref()
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
);

pod_api
.patch(
&p.name_any(),
&PatchParams::apply("controller-1"),
&Patch::Apply(&p),
)
.await
.map_err(Error::WriteFailed)?;

Ok(Action::requeue(Duration::from_secs(300)))
}

/// Controller will trigger this whenever our main pod has changed. The
/// function reconciles a pod by writing a status if it does not document a
/// port.
async fn reconcile_status(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
let mut conditions = pod
.status
.clone()
.unwrap_or_default()
.conditions
.unwrap_or_default();

// If the condition already exists, exit
for cond in conditions.iter() {
if cond.type_ == condition::UNDOCUMENTED_TYPE {
return Ok(Action::requeue(Duration::from_secs(300)));
}
}

pod.spec
.clone()
.unwrap_or_default()
.containers
.iter()
.for_each(|c| {
if c.ports.clone().unwrap_or_default().len() == 0 {
conditions.push(PodCondition {
type_: condition::UNDOCUMENTED_TYPE.into(),
status: condition::STATUS_TRUE.into(),
..Default::default()
})
}
});

let mut current_conds = pod
.status
.clone()
.unwrap_or_default()
.conditions
.unwrap_or_default()
.into_iter()
.filter(|c| c.type_ != condition::UNDOCUMENTED_TYPE && c.status != condition::STATUS_TRUE)
.collect::<Vec<PodCondition>>();

for condition in conditions {
current_conds.push(condition);
}

let status = PodStatus {
conditions: Some(current_conds),
..Default::default()
};
let pod_api = Api::<Pod>::namespaced(
ctx.client.clone(),
pod.metadata
.namespace
.as_ref()
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
);

let name = pod.name_any();
let value = serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"name": name,
"status": status,
});
let p = Patch::Merge(value);
pod_api
.patch_status(&pod.name_any(), &PatchParams::apply("controller-2"), &p)
.await
.map_err(Error::WriteFailed)?;

Ok(Action::requeue(Duration::from_secs(300)))
}
2 changes: 2 additions & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ backoff.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
async-broadcast = "0.7.0"
async-stream = "0.3.5"

[dev-dependencies]
kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" }
Expand Down
53 changes: 53 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ where
})
}

pub fn trigger_self_shared<K, S>(
stream: S,
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_shared_obj_with(obj.clone(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}

/// Enqueues any mapper returned `K` types for reconciliation
fn trigger_others<S, K, I>(
stream: S,
Expand Down Expand Up @@ -701,6 +718,42 @@ where
}
}

// TODO: do an entrypoint for shared streams of owned objects
//
// Is it better to use a concrete type (i.e. a SubscribeHandle as a trigger)
// or to pass in the reader out-of-band?
pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
where
K::DynamicType: Default,
{
Self::for_shared_stream_with(trigger, reader, Default::default())
}

pub fn for_shared_stream_with(
trigger: impl Stream<Item = Arc<K>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self_shared(trigger.map(|obj| Ok(obj)), dyntype.clone()).boxed();
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
forceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
dyntype,
reader,
config: Default::default(),
}
}

/// Specify the configuration for the controller's behavior.
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
Expand Down
Loading