66
77from test_base import TestBase
88
9- from warnet .constants import WARGAMES_NAMESPACE_PREFIX
10- from warnet .k8s import get_kubeconfig_value , get_static_client
9+ from warnet .constants import KUBECONFIG , WARGAMES_NAMESPACE_PREFIX
10+ from warnet .k8s import (
11+ K8sError ,
12+ get_kubeconfig_value ,
13+ get_static_client ,
14+ open_kubeconfig ,
15+ write_kubeconfig ,
16+ )
1117from warnet .process import run_command
1218
1319
@@ -29,15 +35,34 @@ def run_test(self):
2935 try :
3036 os .chdir (self .tmpdir )
3137 self .log .info (f"Running test in: { self .tmpdir } " )
38+ self .establish_initial_context ()
39+ self .establish_names ()
3240 self .setup_namespaces ()
33- self .initial_context = get_kubeconfig_value ("{.current-context}" )
3441 self .setup_service_accounts ()
3542 self .deploy_network_in_team_namespaces ()
3643 self .authenticate_and_become_bob ()
3744 self .return_to_intial_context ()
3845 finally :
46+ try :
47+ self .cleanup_kubeconfig ()
48+ except K8sError as e :
49+ self .log .info (f"KUBECONFIG cleanup error: { e } " )
3950 self .cleanup ()
4051
52+ def establish_initial_context (self ):
53+ self .initial_context = get_kubeconfig_value ("{.current-context}" )
54+ self .log .info (f"Initial context: { self .initial_context } " )
55+
56+ def establish_names (self ):
57+ self .bob_user = "bob-warnettest"
58+ self .bob_auth_file = "bob-warnettest-wargames-red-team-warnettest-kubeconfig"
59+ self .bob_context = "bob-warnettest-wargames-red-team-warnettest"
60+
61+ self .blue_namespace = "wargames-blue-team-warnettest"
62+ self .red_namespace = "wargames-red-team-warnettest"
63+ self .blue_users = ["carol-warnettest" , "default" , "mallory-warnettest" ]
64+ self .red_users = ["alice-warnettest" , self .bob_user , "default" ]
65+
4166 def return_to_intial_context (self ):
4267 cmd = f"kubectl config use-context { self .initial_context } "
4368 self .log .info (run_command (cmd ))
@@ -59,6 +84,7 @@ def setup_service_accounts(self):
5984 self .log .info ("Creating service accounts..." )
6085 self .log .info (self .warnet ("admin create-kubeconfigs" ))
6186 self .wait_for_predicate (self .service_accounts_are_validated )
87+ self .log .info ("Service accounts have been set up and validated" )
6288
6389 def deploy_network_in_team_namespaces (self ):
6490 self .log .info ("Deploy networks to team namespaces" )
@@ -70,8 +96,8 @@ def deploy_network_in_team_namespaces(self):
7096 def authenticate_and_become_bob (self ):
7197 self .log .info ("Authenticating and becoming bob..." )
7298 assert get_kubeconfig_value ("{.current-context}" ) == self .initial_context
73- self .log . info ( self . warnet ("auth kubeconfigs/bob-wargames-red-team-kubeconfig" ) )
74- assert get_kubeconfig_value ("{.current-context}" ) == "bob-wargames-red-team"
99+ self .warnet (f "auth kubeconfigs/{ self . bob_auth_file } " )
100+ assert get_kubeconfig_value ("{.current-context}" ) == self . bob_context
75101
76102 def service_accounts_are_validated (self ) -> bool :
77103 self .log .info ("Checking service accounts" )
@@ -93,8 +119,8 @@ def service_accounts_are_validated(self) -> bool:
93119 maybe_service_accounts .setdefault (namespace , []).append (sa .metadata .name )
94120
95121 expected = {
96- "wargames-blue-team" : [ "carol" , "default" , "mallory" ] ,
97- "wargames-red-team" : [ "alice" , "bob" , "default" ] ,
122+ self . blue_namespace : self . blue_users ,
123+ self . red_namespace : self . red_users ,
98124 }
99125
100126 return maybe_service_accounts == expected
@@ -115,11 +141,37 @@ def two_namespaces_are_validated(self) -> bool:
115141 maybe_namespaces = self .get_namespaces ()
116142 if maybe_namespaces is None :
117143 return False
118- if len (maybe_namespaces ) != 2 :
119- return False
120- if "wargames-blue-team" not in maybe_namespaces :
144+ if self .blue_namespace not in maybe_namespaces :
121145 return False
122- return "wargames-red-team" in maybe_namespaces
146+ return self .red_namespace in maybe_namespaces
147+
148+ def cleanup_kubeconfig (self ):
149+ try :
150+ kubeconfig_data = open_kubeconfig (KUBECONFIG )
151+ except K8sError as e :
152+ raise K8sError (f"Could not open KUBECONFIG: { KUBECONFIG } " ) from e
153+
154+ kubeconfig_data = remove_user (kubeconfig_data , self .bob_user )
155+ kubeconfig_data = remove_context (kubeconfig_data , self .bob_context )
156+
157+ try :
158+ write_kubeconfig (kubeconfig_data , KUBECONFIG )
159+ except Exception as e :
160+ raise K8sError (f"Could not write to KUBECONFIG: { KUBECONFIG } " ) from e
161+
162+
163+ def remove_user (kubeconfig_data : dict , username : str ) -> dict :
164+ kubeconfig_data ["users" ] = [
165+ user for user in kubeconfig_data ["users" ] if user ["name" ] != username
166+ ]
167+ return kubeconfig_data
168+
169+
170+ def remove_context (kubeconfig_data : dict , context_name : str ) -> dict :
171+ kubeconfig_data ["contexts" ] = [
172+ context for context in kubeconfig_data ["contexts" ] if context ["name" ] != context_name
173+ ]
174+ return kubeconfig_data
123175
124176
125177if __name__ == "__main__" :
0 commit comments