@@ -7,12 +7,14 @@ use k8s_openapi::api::{
7
7
use kube:: {
8
8
api:: { ListParams , Patch , PatchParams , PostParams } ,
9
9
core:: { DynamicObject , GroupVersionKind , ObjectList , ObjectMeta , TypeMeta } ,
10
- discovery:: Scope ,
10
+ discovery:: { ApiCapabilities , ApiResource , Scope } ,
11
11
Api , Discovery , ResourceExt ,
12
12
} ;
13
13
use serde:: Deserialize ;
14
14
use snafu:: { OptionExt , ResultExt , Snafu } ;
15
15
use stackable_operator:: { commons:: listener:: Listener , kvp:: Labels } ;
16
+ use tokio:: sync:: RwLock ;
17
+ use tracing:: info;
16
18
17
19
use crate :: {
18
20
platform:: { cluster, credentials:: Credentials } ,
@@ -39,12 +41,15 @@ pub enum Error {
39
41
#[ snafu( display( "failed to deserialize YAML data" ) ) ]
40
42
DeserializeYaml { source : serde_yaml:: Error } ,
41
43
44
+ #[ snafu( display( "failed to run GVK discovery" ) ) ]
45
+ GVKDiscoveryRun { source : kube:: error:: Error } ,
46
+
47
+ #[ snafu( display( "GVK {gvk:?} is not known" ) ) ]
48
+ GVKUnkown { gvk : GroupVersionKind } ,
49
+
42
50
#[ snafu( display( "failed to deploy manifest because type of object {object:?} is not set" ) ) ]
43
51
ObjectType { object : DynamicObject } ,
44
52
45
- #[ snafu( display( "failed to deploy manifest because GVK {gvk:?} cannot be resolved" ) ) ]
46
- DiscoveryResolve { gvk : GroupVersionKind } ,
47
-
48
53
#[ snafu( display( "failed to convert byte string into UTF-8 string" ) ) ]
49
54
ByteStringConvert { source : FromUtf8Error } ,
50
55
@@ -66,7 +71,9 @@ pub enum Error {
66
71
67
72
pub struct Client {
68
73
client : kube:: Client ,
69
- discovery : Discovery ,
74
+
75
+ // Choosing an [`RwLock`] here, as their can be many reads in parallel, but running a discovery is very rare
76
+ discovery : RwLock < Discovery > ,
70
77
}
71
78
72
79
impl Client {
@@ -77,10 +84,7 @@ impl Client {
77
84
. await
78
85
. context ( KubeClientCreateSnafu ) ?;
79
86
80
- let discovery = Discovery :: new ( client. clone ( ) )
81
- . run ( )
82
- . await
83
- . context ( KubeClientFetchSnafu ) ?;
87
+ let discovery = RwLock :: new ( Self :: run_discovery ( client. clone ( ) ) . await ?) ;
84
88
85
89
Ok ( Self { client, discovery } )
86
90
}
@@ -112,9 +116,9 @@ impl Client {
112
116
113
117
let gvk = Self :: gvk_of_typemeta ( object_type) ;
114
118
let ( resource, capabilities) = self
115
- . discovery
116
119
. resolve_gvk ( & gvk)
117
- . context ( DiscoveryResolveSnafu { gvk } ) ?;
120
+ . await ?
121
+ . context ( GVKUnkownSnafu { gvk } ) ?;
118
122
119
123
let api: Api < DynamicObject > = match capabilities. scope {
120
124
Scope :: Cluster => {
@@ -147,9 +151,9 @@ impl Client {
147
151
gvk : & GroupVersionKind ,
148
152
namespace : Option < & str > ,
149
153
) -> Result < Option < ObjectList < DynamicObject > > , Error > {
150
- let object_api_resource = match self . discovery . resolve_gvk ( gvk) {
151
- Some ( ( object_api_resource, _) ) => object_api_resource,
152
- None => {
154
+ let object_api_resource = match self . resolve_gvk ( gvk) . await {
155
+ Ok ( Some ( ( object_api_resource, _) ) ) => object_api_resource,
156
+ _ => {
153
157
return Ok ( None ) ;
154
158
}
155
159
} ;
@@ -175,9 +179,9 @@ impl Client {
175
179
object_name : & str ,
176
180
gvk : & GroupVersionKind ,
177
181
) -> Result < Option < DynamicObject > , Error > {
178
- let object_api_resource = match self . discovery . resolve_gvk ( gvk) {
179
- Some ( ( object_api_resource, _) ) => object_api_resource,
180
- None => {
182
+ let object_api_resource = match self . resolve_gvk ( gvk) . await {
183
+ Ok ( Some ( ( object_api_resource, _) ) ) => object_api_resource,
184
+ _ => {
181
185
return Ok ( None ) ;
182
186
}
183
187
} ;
@@ -383,6 +387,43 @@ impl Client {
383
387
endpoints_api. get ( name) . await . context ( KubeClientFetchSnafu )
384
388
}
385
389
390
+ /// Try to resolve the given [`GroupVersionKind`]. In case the resolution fails a discovery is run to pull in new
391
+ /// GVKs that are not present in the [`Discovery`] cache. Afterwards a normal resolution is issued.
392
+ async fn resolve_gvk (
393
+ & self ,
394
+ gvk : & GroupVersionKind ,
395
+ ) -> Result < Option < ( ApiResource , ApiCapabilities ) > > {
396
+ let resolved = self . discovery . read ( ) . await . resolve_gvk ( gvk) ;
397
+
398
+ Ok ( match resolved {
399
+ Some ( resolved) => Some ( resolved) ,
400
+ None => {
401
+ info ! ( ?gvk, "discovery did not include gvk" ) ;
402
+
403
+ // We take the lock early here to avoid running multiple discoveries in parallel (as they are expensive)
404
+ let mut old_discovery = self . discovery . write ( ) . await ;
405
+
406
+ // We create a new Discovery object here, as [`Discovery::run`] consumes self
407
+ let new_discovery = Self :: run_discovery ( self . client . clone ( ) ) . await ?;
408
+ * old_discovery = new_discovery;
409
+
410
+ // Release the lock as quickly as possible
411
+ drop ( old_discovery) ;
412
+ self . discovery . read ( ) . await . resolve_gvk ( gvk)
413
+ }
414
+ } )
415
+ }
416
+
417
+ /// Creates a new [`Discovery`] object and immediatly runs a discovery.
418
+ #[ tracing:: instrument( skip_all) ]
419
+ async fn run_discovery ( client : kube:: client:: Client ) -> Result < Discovery > {
420
+ info ! ( "running discovery" ) ;
421
+ Discovery :: new ( client)
422
+ . run ( )
423
+ . await
424
+ . context ( GVKDiscoveryRunSnafu )
425
+ }
426
+
386
427
/// Extracts the [`GroupVersionKind`] from [`TypeMeta`].
387
428
fn gvk_of_typemeta ( type_meta : & TypeMeta ) -> GroupVersionKind {
388
429
match type_meta. api_version . split_once ( '/' ) {
0 commit comments