Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use kube::{
use serde::de::DeserializeOwned;
use tracing::*;

use std::{env, fmt::Debug};
use std::{env, fmt::Debug, sync::Arc};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -39,9 +39,9 @@ async fn main() -> anyhow::Result<()> {
}

async fn handle_events<
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + DeserializeOwned + 'static,
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + Sync + DeserializeOwned + 'static,
>(
stream: impl Stream<Item = watcher::Result<Event<K>>> + Send + 'static,
stream: impl Stream<Item = watcher::Result<Event<Arc<K>>>> + Send + 'static,
ar: &ApiResource,
) -> anyhow::Result<()> {
let mut items = stream.applied_objects().boxed();
Expand Down
10 changes: 6 additions & 4 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use kube::{
Expand Down Expand Up @@ -25,12 +27,12 @@ async fn main() -> anyhow::Result<()> {
}

// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Event) -> anyhow::Result<()> {
fn handle_event(ev: Arc<Event>) -> anyhow::Result<()> {
info!(
"Event: \"{}\" via {} {}",
ev.message.unwrap().trim(),
ev.involved_object.kind.unwrap(),
ev.involved_object.name.unwrap()
ev.message.as_ref().unwrap().trim(),
ev.involved_object.kind.as_ref().unwrap(),
ev.involved_object.name.as_ref().unwrap()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like some of the more awkward user facing interactions can be simplified a bit by making fns like these take a &Event (say) and call as_ref() earlier as you've done in for wait.rs

);
Ok(())
}
8 changes: 5 additions & 3 deletions examples/multi_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{stream, StreamExt, TryStreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
Expand Down Expand Up @@ -31,9 +33,9 @@ async fn main() -> anyhow::Result<()> {
// SelectAll Stream elements must have the same Item, so all packed in this:
#[allow(clippy::large_enum_variant)]
enum Watched {
Config(ConfigMap),
Deploy(Deployment),
Secret(Secret),
Config(Arc<ConfigMap>),
Deploy(Arc<Deployment>),
Secret(Arc<Secret>),
}
while let Some(o) = combo_stream.try_next().await? {
match o {
Expand Down
7 changes: 5 additions & 2 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
Expand Down Expand Up @@ -25,12 +27,13 @@ async fn main() -> anyhow::Result<()> {
}

// A simple node problem detector
async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result<()> {
async fn check_for_node_failures(events: &Api<Event>, o: Arc<Node>) -> anyhow::Result<()> {
let name = o.name_any();
// Nodes often modify a lot - only print broken nodes
if let Some(true) = o.spec.unwrap().unschedulable {
if let Some(true) = o.spec.clone().unwrap().unschedulable {
let failed = o
.status
.clone()
.unwrap()
.conditions
.unwrap()
Expand Down
72 changes: 37 additions & 35 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl Action {
/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
mapper: impl Fn(Arc<T>) -> I,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = T>,
S: TryStream<Ok = Arc<T>>,
I: IntoIterator,
I::Item: Into<ReconcileRequest<K>>,
K: Resource,
Expand All @@ -110,29 +110,29 @@ pub fn trigger_self<K, S>(
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = K>,
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}

/// Enqueues any mapper returned `K` types for reconciliation
fn trigger_others<S, K, I>(
fn trigger_others<T, S, K, I>(
stream: S,
mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
dyntype: <S::Ok as Resource>::DynamicType,
mapper: impl Fn(Arc<T>) -> I + Sync + Send + 'static,
dyntype: T::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
// Input stream has items as some Resource (via Controller::watches)
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
S: TryStream<Ok = Arc<T>>,
T: Resource,
T::DynamicType: Clone,
// Output stream is requests for the root type K
K: Resource,
K::DynamicType: Clone,
Expand All @@ -141,7 +141,7 @@ where
I::IntoIter: Send,
{
trigger_with(stream, move |obj| {
let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
mapper(obj)
.into_iter()
.map(move |mapped_obj_ref| ReconcileRequest {
Expand All @@ -154,19 +154,19 @@ where
}

/// Enqueues any owners of type `KOwner` for reconciliation
pub fn trigger_owners<KOwner, S>(
pub fn trigger_owners<KOwner, S, T>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: <S::Ok as Resource>::DynamicType,
child_type: T::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
S: TryStream<Ok = Arc<T>>,
T: Resource,
T::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
let mapper = move |obj: S::Ok| {
let mapper = move |obj: Arc<T>| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
Expand Down Expand Up @@ -605,7 +605,7 @@ where
/// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<K>, watcher::Error>> + Send + 'static,
reader: Store<K>,
) -> Self
where
Expand All @@ -629,7 +629,7 @@ where
/// [`dynamic`]: kube_client::core::dynamic
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream_with(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<K>, watcher::Error>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
Expand Down Expand Up @@ -680,7 +680,9 @@ where
///
/// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
#[must_use]
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
pub fn owns<
Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static + Sync,
>(
self,
api: Api<Child>,
wc: Config,
Expand All @@ -692,7 +694,7 @@ where
///
/// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
#[must_use]
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync>(
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
Expand Down Expand Up @@ -750,7 +752,7 @@ where
#[must_use]
pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Child>, watcher::Error>> + Send + 'static,
) -> Self {
self.owns_stream_with(trigger, ())
}
Expand All @@ -768,7 +770,7 @@ where
#[must_use]
pub fn owns_stream_with<Child: Resource + Send + 'static>(
mut self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Child>, watcher::Error>> + Send + 'static,
dyntype: Child::DynamicType,
) -> Self
where
Expand Down Expand Up @@ -848,10 +850,10 @@ where
self,
api: Api<Other>,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync,
Other::DynamicType: Default + Debug + Clone + Eq + Hash,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
Expand All @@ -868,10 +870,10 @@ where
api: Api<Other>,
dyntype: Other::DynamicType,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
Other::DynamicType: Debug + Clone + Eq + Hash,
Expand Down Expand Up @@ -904,7 +906,7 @@ where
/// # type CustomResource = ConfigMap;
/// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
/// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
/// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
/// # async fn doc(client: kube::Client) {
/// let api: Api<DaemonSet> = Api::all(client.clone());
/// let cr: Api<CustomResource> = Api::all(client.clone());
Expand All @@ -923,8 +925,8 @@ where
#[must_use]
pub fn watches_stream<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Other>, watcher::Error>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Expand All @@ -948,8 +950,8 @@ where
#[must_use]
pub fn watches_stream_with<Other, I>(
mut self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Other>, watcher::Error>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
dyntype: Other::DynamicType,
) -> Self
where
Expand Down Expand Up @@ -1244,7 +1246,7 @@ mod tests {
// and returns a WatchEvent generic over a resource `K`
fn assert_stream<T, K>(x: T) -> T
where
T: Stream<Item = watcher::Result<Event<K>>> + Send,
T: Stream<Item = watcher::Result<Event<Arc<K>>>> + Send,
K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
{
x
Expand Down Expand Up @@ -1310,14 +1312,14 @@ mod tests {
);
pin_mut!(applier);
for i in 0..items {
let obj = ConfigMap {
let obj = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some(format!("cm-{i}")),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
});
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}
Expand Down
Loading