1
- use crate :: commons:: networking:: DomainName ;
2
- use crate :: kvp:: LabelSelectorExt ;
3
- use crate :: utils:: cluster_domain:: { self , retrieve_cluster_domain} ;
1
+ use std:: {
2
+ convert:: TryFrom ,
3
+ fmt:: { Debug , Display } ,
4
+ } ;
4
5
5
6
use either:: Either ;
6
7
use futures:: StreamExt ;
7
- use k8s_openapi:: apimachinery:: pkg:: apis:: meta:: v1:: LabelSelector ;
8
- use k8s_openapi:: { ClusterResourceScope , NamespaceResourceScope } ;
9
- use kube:: api:: { DeleteParams , ListParams , Patch , PatchParams , PostParams , Resource , ResourceExt } ;
10
- use kube:: client:: Client as KubeClient ;
11
- use kube:: core:: Status ;
12
- use kube:: runtime:: wait:: delete:: delete_and_finalize;
13
- use kube:: runtime:: { watcher, WatchStreamExt } ;
14
- use kube:: { Api , Config } ;
15
- use serde:: de:: DeserializeOwned ;
16
- use serde:: Serialize ;
8
+ use k8s_openapi:: {
9
+ apimachinery:: pkg:: apis:: meta:: v1:: LabelSelector , ClusterResourceScope , NamespaceResourceScope ,
10
+ } ;
11
+ use kube:: {
12
+ api:: { DeleteParams , ListParams , Patch , PatchParams , PostParams , Resource , ResourceExt } ,
13
+ client:: Client as KubeClient ,
14
+ core:: Status ,
15
+ runtime:: { wait:: delete:: delete_and_finalize, watcher, WatchStreamExt } ,
16
+ Api , Config ,
17
+ } ;
18
+ use serde:: { de:: DeserializeOwned , Serialize } ;
17
19
use snafu:: { OptionExt , ResultExt , Snafu } ;
18
- use std:: convert:: TryFrom ;
19
- use std:: fmt:: { Debug , Display } ;
20
20
use tracing:: trace;
21
21
22
+ use crate :: {
23
+ cli:: ProductOperatorRun , kvp:: LabelSelectorExt , utils:: cluster_info:: KubernetesClusterInfo ,
24
+ } ;
25
+
22
26
pub type Result < T , E = Error > = std:: result:: Result < T , E > ;
23
27
24
28
#[ derive( Debug , Snafu ) ]
@@ -79,9 +83,6 @@ pub enum Error {
79
83
80
84
#[ snafu( display( "unable to create kubernetes client" ) ) ]
81
85
CreateKubeClient { source : kube:: Error } ,
82
-
83
- #[ snafu( display( "unable to to resolve kubernetes cluster domain" ) ) ]
84
- ResolveKubernetesClusterDomain { source : cluster_domain:: Error } ,
85
86
}
86
87
87
88
/// This `Client` can be used to access Kubernetes.
@@ -94,15 +95,16 @@ pub struct Client {
94
95
delete_params : DeleteParams ,
95
96
/// Default namespace as defined in the kubeconfig this client has been created from.
96
97
pub default_namespace : String ,
97
- pub kubernetes_cluster_domain : DomainName ,
98
+
99
+ pub kubernetes_cluster_info : KubernetesClusterInfo ,
98
100
}
99
101
100
102
impl Client {
101
103
pub fn new (
102
104
client : KubeClient ,
103
105
field_manager : Option < String > ,
104
106
default_namespace : String ,
105
- kubernetes_cluster_domain : DomainName ,
107
+ kubernetes_cluster_info : KubernetesClusterInfo ,
106
108
) -> Self {
107
109
Client {
108
110
client,
@@ -116,7 +118,7 @@ impl Client {
116
118
} ,
117
119
delete_params : DeleteParams :: default ( ) ,
118
120
default_namespace,
119
- kubernetes_cluster_domain ,
121
+ kubernetes_cluster_info ,
120
122
}
121
123
}
122
124
@@ -515,15 +517,18 @@ impl Client {
515
517
///
516
518
/// ```no_run
517
519
/// use std::time::Duration;
520
+ /// use clap::Parser;
518
521
/// use tokio::time::error::Elapsed;
519
522
/// use kube::runtime::watcher;
520
523
/// use k8s_openapi::api::core::v1::Pod;
521
- /// use stackable_operator::client::{Client, initialize_operator};
524
+ /// use stackable_operator::{cli::ProductOperatorRun, client::{Client, initialize_operator} };
522
525
///
523
526
/// #[tokio::main]
524
527
/// async fn main(){
525
528
///
526
- /// let client: Client = initialize_operator(None).await.expect("Unable to construct client.");
529
+ /// // Parse CLI arguments with Opts::parse() instead
530
+ /// let cli_opts = ProductOperatorRun::parse_from(["run"]);
531
+ /// let client: Client = initialize_operator(&cli_opts, None).await.expect("Unable to construct client.");
527
532
/// let watcher_config: watcher::Config =
528
533
/// watcher::Config::default().fields(&format!("metadata.name=nonexistent-pod"));
529
534
///
@@ -630,39 +635,49 @@ where
630
635
}
631
636
}
632
637
633
- pub async fn initialize_operator ( field_manager : Option < String > ) -> Result < Client > {
638
+ pub async fn initialize_operator (
639
+ cli_opts : & ProductOperatorRun ,
640
+ field_manager : Option < String > ,
641
+ ) -> Result < Client > {
634
642
let kubeconfig: Config = kube:: Config :: infer ( )
635
643
. await
636
644
. map_err ( kube:: Error :: InferConfig )
637
645
. context ( InferKubeConfigSnafu ) ?;
638
646
let default_namespace = kubeconfig. default_namespace . clone ( ) ;
639
647
let client = kube:: Client :: try_from ( kubeconfig) . context ( CreateKubeClientSnafu ) ?;
640
- let cluster_domain = retrieve_cluster_domain ( ) . context ( ResolveKubernetesClusterDomainSnafu ) ? ;
648
+ let cluster_info = KubernetesClusterInfo :: new ( cli_opts ) ;
641
649
642
650
Ok ( Client :: new (
643
651
client,
644
652
field_manager,
645
653
default_namespace,
646
- cluster_domain ,
654
+ cluster_info ,
647
655
) )
648
656
}
649
657
650
658
#[ cfg( test) ]
651
659
mod tests {
660
+ use std:: { collections:: BTreeMap , time:: Duration } ;
661
+
662
+ use clap:: Parser ;
652
663
use futures:: StreamExt ;
653
- use k8s_openapi:: api:: core:: v1:: { Container , Pod , PodSpec } ;
654
- use k8s_openapi:: apimachinery:: pkg:: apis:: meta:: v1:: LabelSelector ;
655
- use kube:: api:: { ObjectMeta , PostParams , ResourceExt } ;
656
- use kube:: runtime:: watcher;
657
- use kube:: runtime:: watcher:: Event ;
658
- use std:: collections:: BTreeMap ;
659
- use std:: time:: Duration ;
664
+ use k8s_openapi:: {
665
+ api:: core:: v1:: { Container , Pod , PodSpec } ,
666
+ apimachinery:: pkg:: apis:: meta:: v1:: LabelSelector ,
667
+ } ;
668
+ use kube:: {
669
+ api:: { ObjectMeta , PostParams , ResourceExt } ,
670
+ runtime:: watcher:: { self , Event } ,
671
+ } ;
660
672
use tokio:: time:: error:: Elapsed ;
661
673
674
+ use crate :: cli:: ProductOperatorRun ;
675
+
662
676
#[ tokio:: test]
663
677
#[ ignore = "Tests depending on Kubernetes are not ran by default" ]
664
678
async fn k8s_test_wait_created ( ) {
665
- let client = super :: initialize_operator ( None )
679
+ let cli_opts = ProductOperatorRun :: parse_from ( [ "run" ] ) ;
680
+ let client = super :: initialize_operator ( & cli_opts, None )
666
681
. await
667
682
. expect ( "KUBECONFIG variable must be configured." ) ;
668
683
@@ -740,7 +755,8 @@ mod tests {
740
755
#[ tokio:: test]
741
756
#[ ignore = "Tests depending on Kubernetes are not ran by default" ]
742
757
async fn k8s_test_wait_created_timeout ( ) {
743
- let client = super :: initialize_operator ( None )
758
+ let cli_opts = ProductOperatorRun :: parse_from ( [ "run" ] ) ;
759
+ let client = super :: initialize_operator ( & cli_opts, None )
744
760
. await
745
761
. expect ( "KUBECONFIG variable must be configured." ) ;
746
762
@@ -760,7 +776,8 @@ mod tests {
760
776
#[ tokio:: test]
761
777
#[ ignore = "Tests depending on Kubernetes are not ran by default" ]
762
778
async fn k8s_test_list_with_label_selector ( ) {
763
- let client = super :: initialize_operator ( None )
779
+ let cli_opts = ProductOperatorRun :: parse_from ( [ "run" ] ) ;
780
+ let client = super :: initialize_operator ( & cli_opts, None )
764
781
. await
765
782
. expect ( "KUBECONFIG variable must be configured." ) ;
766
783
0 commit comments