Skip to content

Commit b5cf5a4

Browse files
committed
Merge remote-tracking branch 'origin/main' into test/add-enum-tests
2 parents c856801 + 7c63f56 commit b5cf5a4

File tree

9 files changed

+133
-42
lines changed

9 files changed

+133
-42
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ educe = { version = "0.6.0", default-features = false }
4646
either = "1.6.1"
4747
form_urlencoded = "1.2.0"
4848
futures = { version = "0.3.17", default-features = false }
49-
hashbrown = "0.15.0"
49+
hashbrown = "0.16.0"
5050
home = "0.5.4"
5151
hostname = "0.4"
5252
http = "1.1.0"
@@ -81,7 +81,7 @@ tempfile = "3.1.0"
8181
thiserror = "2.0.3"
8282
tokio = "1.14.0"
8383
tokio-test = "0.4.0"
84-
tokio-tungstenite = "0.27.0"
84+
tokio-tungstenite = "0.28.0"
8585
tokio-util = "0.7.0"
8686
tower = "0.5.1"
8787
tower-http = "0.6.1"

examples/crd_derive.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use kube::{
33
core::object::{HasSpec, HasStatus},
44
CustomResource, CustomResourceExt, Resource,
55
};
6-
use schemars::JsonSchema;
6+
use schemars::{json_schema, JsonSchema};
77
use serde::{Deserialize, Serialize};
88

99
/// Our spec for Foo
@@ -63,7 +63,7 @@ fn main() {
6363
}
6464

6565
fn conditions(_: &mut schemars::generate::SchemaGenerator) -> schemars::Schema {
66-
serde_json::from_value(serde_json::json!({
66+
json_schema!({
6767
"type": "array",
6868
"x-kubernetes-list-type": "map",
6969
"x-kubernetes-list-map-keys": ["type"],
@@ -85,8 +85,7 @@ fn conditions(_: &mut schemars::generate::SchemaGenerator) -> schemars::Schema {
8585
"type"
8686
],
8787
},
88-
}))
89-
.unwrap()
88+
})
9089
}
9190

9291
// some tests

examples/crd_derive_schema.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use kube::{
99
runtime::wait::{await_condition, conditions},
1010
Client, CustomResource, CustomResourceExt, KubeSchema,
1111
};
12+
use schemars::json_schema;
1213
use serde::{Deserialize, Serialize};
1314

1415
// This example shows how the generated schema affects defaulting and validation.
@@ -121,16 +122,15 @@ impl FooSpec {
121122

122123
// https://kubernetes.io/docs/reference/using-api/server-side-apply/#merge-strategy
123124
fn set_listable_schema(_: &mut schemars::generate::SchemaGenerator) -> schemars::Schema {
124-
serde_json::from_value(serde_json::json!({
125+
json_schema!({
125126
"type": "array",
126127
"items": {
127128
"format": "u32",
128129
"minium": 0,
129130
"type": "integer"
130131
},
131132
"x-kubernetes-list-type": "set"
132-
}))
133-
.unwrap()
133+
})
134134
}
135135

136136
fn default_value() -> String {

kube-client/src/api/util/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ mod test {
7979

8080
let node_name = "fakenode";
8181
let fake_node = serde_json::from_value(json!({
82-
"apiVersion": "v1",
83-
"kind": "Node",
84-
"metadata": {
85-
"name": node_name,
82+
"apiVersion": "v1",
83+
"kind": "Node",
84+
"metadata": {
85+
"name": node_name,
8686
},
8787
}))?;
8888

kube-core/src/duration.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,10 @@ impl schemars::JsonSchema for Duration {
283283
}
284284

285285
fn json_schema(_: &mut schemars::generate::SchemaGenerator) -> schemars::Schema {
286-
use schemars::json_schema;
287-
288286
// the format should *not* be "duration", because "duration" means
289287
// the duration is formatted in ISO 8601, as described here:
290288
// https://datatracker.ietf.org/doc/html/draft-handrews-json-schema-validation-02#section-7.3.1
291-
json_schema!({
289+
schemars::json_schema!({
292290
"type": "string",
293291
})
294292
}

kube-runtime/src/utils/predicate.rs

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
use crate::{reflector::ObjectRef, watcher::Error};
1+
use crate::watcher::Error;
22
use core::{
33
pin::Pin,
44
task::{ready, Context, Poll},
55
};
66
use futures::Stream;
7-
use kube_client::Resource;
7+
use kube_client::{api::ObjectMeta, Resource};
88
use pin_project::pin_project;
99
use std::{
1010
collections::{hash_map::DefaultHasher, HashMap},
1111
hash::{Hash, Hasher},
12+
marker::PhantomData,
1213
};
1314

1415
fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
@@ -17,6 +18,24 @@ fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
1718
hasher.finish()
1819
}
1920

21+
/// Private cache key that includes UID in equality for predicate filtering
22+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23+
struct PredicateCacheKey {
24+
name: String,
25+
namespace: Option<String>,
26+
uid: Option<String>,
27+
}
28+
29+
impl From<&ObjectMeta> for PredicateCacheKey {
30+
fn from(meta: &ObjectMeta) -> Self {
31+
Self {
32+
name: meta.name.clone().unwrap_or_default(),
33+
namespace: meta.namespace.clone(),
34+
uid: meta.uid.clone(),
35+
}
36+
}
37+
}
38+
2039
/// A predicate is a hasher of Kubernetes objects stream filtering
2140
pub trait Predicate<K> {
2241
/// A predicate only needs to implement optional hashing when keys exist
@@ -103,7 +122,9 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
103122
#[pin]
104123
stream: St,
105124
predicate: P,
106-
cache: HashMap<ObjectRef<K>, u64>,
125+
cache: HashMap<PredicateCacheKey, u64>,
126+
// K: Resource necessary to get .meta() of an object during polling
127+
_phantom: PhantomData<K>,
107128
}
108129
impl<St, K, P> PredicateFilter<St, K, P>
109130
where
@@ -116,6 +137,7 @@ where
116137
stream,
117138
predicate,
118139
cache: HashMap::new(),
140+
_phantom: PhantomData,
119141
}
120142
}
121143
}
@@ -134,17 +156,9 @@ where
134156
break match ready!(me.stream.as_mut().poll_next(cx)) {
135157
Some(Ok(obj)) => {
136158
if let Some(val) = me.predicate.hash_property(&obj) {
137-
let key = ObjectRef::from_obj(&obj);
138-
let changed = if let Some(old) = me.cache.get(&key) {
139-
*old != val
140-
} else {
141-
true
142-
};
143-
if let Some(old) = me.cache.get_mut(&key) {
144-
*old = val;
145-
} else {
146-
me.cache.insert(key, val);
147-
}
159+
let key = PredicateCacheKey::from(obj.meta());
160+
let changed = me.cache.get(&key) != Some(&val);
161+
me.cache.insert(key, val);
148162
if changed {
149163
Some(Ok(obj))
150164
} else {
@@ -251,4 +265,58 @@ pub(crate) mod tests {
251265
assert_eq!(second.meta().generation, Some(2));
252266
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
253267
}
268+
269+
#[tokio::test]
270+
async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
271+
use k8s_openapi::api::core::v1::Pod;
272+
273+
let mkobj = |g: i32, uid: &str| {
274+
let p: Pod = serde_json::from_value(json!({
275+
"apiVersion": "v1",
276+
"kind": "Pod",
277+
"metadata": {
278+
"name": "blog",
279+
"namespace": "default",
280+
"generation": Some(g),
281+
"uid": uid,
282+
},
283+
"spec": {
284+
"containers": [{
285+
"name": "blog",
286+
"image": "clux/blog:0.1.0"
287+
}],
288+
}
289+
}))
290+
.unwrap();
291+
p
292+
};
293+
294+
// Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete ->
295+
// create (gen=1, uid=2) -> delete -> create (gen=2, uid=3)
296+
let data = stream::iter([
297+
Ok(mkobj(1, "uid-1")),
298+
Ok(mkobj(1, "uid-1")),
299+
Ok(mkobj(1, "uid-2")),
300+
Ok(mkobj(2, "uid-3")),
301+
]);
302+
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
303+
304+
// mkobj(1, uid-1) passed through
305+
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
306+
assert_eq!(first.meta().generation, Some(1));
307+
assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));
308+
309+
// (no repeat mkobj(1, uid-1) - same UID and generation)
310+
// mkobj(1, uid-2) next - different UID detected
311+
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
312+
assert_eq!(second.meta().generation, Some(1));
313+
assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));
314+
315+
// mkobj(2, uid-3) next
316+
let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
317+
assert_eq!(third.meta().generation, Some(2));
318+
assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));
319+
320+
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
321+
}
254322
}

kube-runtime/src/watcher.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ where
719719
/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
720720
/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
721721
/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
722+
#[doc(alias = "informer")]
722723
pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
723724
api: Api<K>,
724725
watcher_config: Config,

kube/Cargo.toml

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,50 @@ categories = ["network-programming", "caching", "api-bindings", "encoding"]
1313

1414
[features]
1515
default = ["client", "rustls-tls", "ring"]
16+
#! See [kube.rs/features](https://kube.rs/features/) for more details
1617

17-
# default features
18+
## default client
1819
client = ["kube-client/client", "config"]
20+
## default config
1921
config = ["kube-client/config"]
22+
## default tls stack
2023
rustls-tls = ["kube-client/rustls-tls", "client"]
2124

22-
# alternative features
25+
## legacy tls stack (must disable rustls-tls to use)
2326
openssl-tls = ["kube-client/openssl-tls", "client"]
27+
## aws rustls provider
2428
aws-lc-rs = ["kube-client?/aws-lc-rs"]
29+
## ring rustls provider
2530
ring = ["kube-client?/ring"]
31+
## enable webpki roots in rustls
32+
webpki-roots = ["kube-client/webpki-roots", "client"]
2633

27-
# auxiliary features
34+
## enable kube-derive and proc macros
35+
derive = ["kube-derive", "kube-core/schema"]
36+
## enable runtime for controllers/watchers/reflectors
37+
runtime = ["kube-runtime"]
38+
## enable websocket client support for portforward/exec/attach
2839
ws = ["kube-client/ws", "kube-core/ws"]
29-
kubelet-debug = ["kube-client/kubelet-debug", "kube-core/kubelet-debug"]
40+
## enable client oauth support
3041
oauth = ["kube-client/oauth", "client"]
42+
## enable client oidc support
3143
oidc = ["kube-client/oidc", "client"]
44+
## enable client socks5 support
45+
socks5 = ["kube-client/socks5", "client"]
46+
## enable client http proxy support
47+
http-proxy = ["kube-client/http-proxy", "client"]
48+
## enable client gzip usage
3249
gzip = ["kube-client/gzip", "client"]
50+
## enable support for jsonpatch style patch parameters
3351
jsonpatch = ["kube-core/jsonpatch"]
52+
## enable the admission module
3453
admission = ["kube-core/admission"]
35-
derive = ["kube-derive", "kube-core/schema"]
36-
runtime = ["kube-runtime"]
54+
## enable unstable runtime features
3755
unstable-runtime = ["kube-runtime/unstable-runtime", "runtime"]
56+
## enable unstable client features
3857
unstable-client = ["kube-client/unstable-client", "client"]
39-
socks5 = ["kube-client/socks5", "client"]
40-
http-proxy = ["kube-client/http-proxy", "client"]
41-
webpki-roots = ["kube-client/webpki-roots", "client"]
58+
## enable the kubelet debug interface
59+
kubelet-debug = ["kube-client/kubelet-debug", "kube-core/kubelet-debug"]
4260

4361
[package.metadata.docs.rs]
4462
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/latest", "unstable-runtime", "socks5", "http-proxy"]

kube/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#![doc(
2+
html_logo_url = "https://user-images.githubusercontent.com/639336/155115130-758a8ba9-e209-42de-bf6d-cde7be3ed86f.svg#only-light"
3+
)]
14
//! Kube is an umbrella-crate for interacting with [Kubernetes](http://kubernetes.io) in Rust.
25
//!
36
//! # Overview
@@ -103,6 +106,9 @@
103106
//!
104107
//! # Examples
105108
//! A large list of complete, runnable examples with explainations are available in the [examples folder](https://github.com/kube-rs/kube/tree/main/examples).
109+
//!
110+
//! # Features
111+
//! Documented at [kube.rs/features](https://kube.rs/features/).
106112
#![cfg_attr(docsrs, feature(doc_cfg))]
107113

108114
macro_rules! cfg_client {
@@ -390,8 +396,9 @@ mod test {
390396
Ok(())
391397
}
392398

393-
#[tokio::test]
394-
#[ignore = "needs cluster (fetches api resources, and lists all)"]
399+
// #[tokio::test]
400+
// #[ignore = "needs cluster (fetches api resources, and lists all)"]
401+
// TODO: fixup. gets rate limited in default k3s on CI now.
395402
#[cfg(feature = "derive")]
396403
async fn derived_resources_discoverable() -> Result<(), Box<dyn std::error::Error>> {
397404
use crate::{
@@ -415,7 +422,7 @@ mod test {
415422
let establish = await_condition(crds.clone(), "testcrs.kube.rs", conditions::is_crd_established());
416423
let crd = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await??;
417424
assert!(conditions::is_crd_established().matches_object(crd.as_ref()));
418-
tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Established condition is actually not enough for api discovery :(
425+
tokio::time::sleep(std::time::Duration::from_secs(5)).await; // Established condition is actually not enough for api discovery :(
419426

420427
// create partial information for it to discover
421428
let gvk = GroupVersionKind::gvk("kube.rs", "v1", "TestCr");

0 commit comments

Comments
 (0)