6
6
7
7
from test_base import TestBase
8
8
9
- from warnet .constants import WARGAMES_NAMESPACE_PREFIX
10
- from warnet .k8s import get_kubeconfig_value , get_static_client
9
+ from warnet .constants import KUBECONFIG , WARGAMES_NAMESPACE_PREFIX
10
+ from warnet .k8s import (
11
+ K8sError ,
12
+ get_kubeconfig_value ,
13
+ get_static_client ,
14
+ open_kubeconfig ,
15
+ write_kubeconfig ,
16
+ )
11
17
from warnet .process import run_command
12
18
13
19
@@ -29,15 +35,34 @@ def run_test(self):
29
35
try :
30
36
os .chdir (self .tmpdir )
31
37
self .log .info (f"Running test in: { self .tmpdir } " )
38
+ self .establish_initial_context ()
39
+ self .establish_names ()
32
40
self .setup_namespaces ()
33
- self .initial_context = get_kubeconfig_value ("{.current-context}" )
34
41
self .setup_service_accounts ()
35
42
self .deploy_network_in_team_namespaces ()
36
43
self .authenticate_and_become_bob ()
37
44
self .return_to_intial_context ()
38
45
finally :
46
+ try :
47
+ self .cleanup_kubeconfig ()
48
+ except K8sError as e :
49
+ self .log .info (f"KUBECONFIG cleanup error: { e } " )
39
50
self .cleanup ()
40
51
52
+ def establish_initial_context (self ):
53
+ self .initial_context = get_kubeconfig_value ("{.current-context}" )
54
+ self .log .info (f"Initial context: { self .initial_context } " )
55
+
56
+ def establish_names (self ):
57
+ self .bob_user = "bob-warnettest"
58
+ self .bob_auth_file = "bob-warnettest-wargames-red-team-warnettest-kubeconfig"
59
+ self .bob_context = "bob-warnettest-wargames-red-team-warnettest"
60
+
61
+ self .blue_namespace = "wargames-blue-team-warnettest"
62
+ self .red_namespace = "wargames-red-team-warnettest"
63
+ self .blue_users = ["carol-warnettest" , "default" , "mallory-warnettest" ]
64
+ self .red_users = ["alice-warnettest" , self .bob_user , "default" ]
65
+
41
66
def return_to_intial_context (self ):
42
67
cmd = f"kubectl config use-context { self .initial_context } "
43
68
self .log .info (run_command (cmd ))
@@ -59,6 +84,7 @@ def setup_service_accounts(self):
59
84
self .log .info ("Creating service accounts..." )
60
85
self .log .info (self .warnet ("admin create-kubeconfigs" ))
61
86
self .wait_for_predicate (self .service_accounts_are_validated )
87
+ self .log .info ("Service accounts have been set up and validated" )
62
88
63
89
def deploy_network_in_team_namespaces (self ):
64
90
self .log .info ("Deploy networks to team namespaces" )
@@ -70,8 +96,8 @@ def deploy_network_in_team_namespaces(self):
70
96
def authenticate_and_become_bob (self ):
71
97
self .log .info ("Authenticating and becoming bob..." )
72
98
assert get_kubeconfig_value ("{.current-context}" ) == self .initial_context
73
- self .log . info ( self . warnet ("auth kubeconfigs/bob-wargames-red-team-kubeconfig" ) )
74
- assert get_kubeconfig_value ("{.current-context}" ) == "bob-wargames-red-team"
99
+ self .warnet (f "auth kubeconfigs/{ self . bob_auth_file } " )
100
+ assert get_kubeconfig_value ("{.current-context}" ) == self . bob_context
75
101
76
102
def service_accounts_are_validated (self ) -> bool :
77
103
self .log .info ("Checking service accounts" )
@@ -93,8 +119,8 @@ def service_accounts_are_validated(self) -> bool:
93
119
maybe_service_accounts .setdefault (namespace , []).append (sa .metadata .name )
94
120
95
121
expected = {
96
- "wargames-blue-team" : [ "carol" , "default" , "mallory" ] ,
97
- "wargames-red-team" : [ "alice" , "bob" , "default" ] ,
122
+ self . blue_namespace : self . blue_users ,
123
+ self . red_namespace : self . red_users ,
98
124
}
99
125
100
126
return maybe_service_accounts == expected
@@ -115,11 +141,37 @@ def two_namespaces_are_validated(self) -> bool:
115
141
maybe_namespaces = self .get_namespaces ()
116
142
if maybe_namespaces is None :
117
143
return False
118
- if len (maybe_namespaces ) != 2 :
119
- return False
120
- if "wargames-blue-team" not in maybe_namespaces :
144
+ if self .blue_namespace not in maybe_namespaces :
121
145
return False
122
- return "wargames-red-team" in maybe_namespaces
146
+ return self .red_namespace in maybe_namespaces
147
+
148
+ def cleanup_kubeconfig (self ):
149
+ try :
150
+ kubeconfig_data = open_kubeconfig (KUBECONFIG )
151
+ except K8sError as e :
152
+ raise K8sError (f"Could not open KUBECONFIG: { KUBECONFIG } " ) from e
153
+
154
+ kubeconfig_data = remove_user (kubeconfig_data , self .bob_user )
155
+ kubeconfig_data = remove_context (kubeconfig_data , self .bob_context )
156
+
157
+ try :
158
+ write_kubeconfig (kubeconfig_data , KUBECONFIG )
159
+ except Exception as e :
160
+ raise K8sError (f"Could not write to KUBECONFIG: { KUBECONFIG } " ) from e
161
+
162
+
163
+ def remove_user (kubeconfig_data : dict , username : str ) -> dict :
164
+ kubeconfig_data ["users" ] = [
165
+ user for user in kubeconfig_data ["users" ] if user ["name" ] != username
166
+ ]
167
+ return kubeconfig_data
168
+
169
+
170
+ def remove_context (kubeconfig_data : dict , context_name : str ) -> dict :
171
+ kubeconfig_data ["contexts" ] = [
172
+ context for context in kubeconfig_data ["contexts" ] if context ["name" ] != context_name
173
+ ]
174
+ return kubeconfig_data
123
175
124
176
125
177
if __name__ == "__main__" :
0 commit comments