66
77from test_base import TestBase
88
9- from warnet .constants import WARGAMES_NAMESPACE_PREFIX
10- from warnet .k8s import get_kubeconfig_value , get_static_client
9+ from warnet .constants import KUBECONFIG , WARGAMES_NAMESPACE_PREFIX
10+ from warnet .k8s import (
11+ K8sError ,
12+ get_kubeconfig_value ,
13+ get_static_client ,
14+ open_kubeconfig ,
15+ write_kubeconfig ,
16+ )
1117from warnet .process import run_command
1218
1319
@@ -29,15 +35,30 @@ def run_test(self):
2935 try :
3036 os .chdir (self .tmpdir )
3137 self .log .info (f"Running test in: { self .tmpdir } " )
38+ self .establish_names ()
3239 self .setup_namespaces ()
3340 self .initial_context = get_kubeconfig_value ("{.current-context}" )
3441 self .setup_service_accounts ()
3542 self .deploy_network_in_team_namespaces ()
3643 self .authenticate_and_become_bob ()
3744 self .return_to_intial_context ()
3845 finally :
46+ try :
47+ self .cleanup_kubeconfig ()
48+ except K8sError as e :
49+ self .log .info (e )
3950 self .cleanup ()
4051
52+ def establish_names (self ):
53+ self .bob_user = "bob-warnettest"
54+ self .bob_auth_file = "bob-warnettest-wargames-red-team-warnettest-kubeconfig"
55+ self .bob_context = "bob-warnettest-wargames-red-team-warnettest"
56+
57+ self .blue_namespace = "wargames-blue-team-warnettest"
58+ self .red_namespace = "wargames-red-team-warnettest"
59+ self .blue_users = ["carol-warnettest" , "default" , "mallory-warnettest" ]
60+ self .red_users = ["alice-warnettest" , self .bob_user , "default" ]
61+
4162 def return_to_intial_context (self ):
4263 cmd = f"kubectl config use-context { self .initial_context } "
4364 self .log .info (run_command (cmd ))
@@ -70,8 +91,8 @@ def deploy_network_in_team_namespaces(self):
7091 def authenticate_and_become_bob (self ):
7192 self .log .info ("Authenticating and becoming bob..." )
7293 assert get_kubeconfig_value ("{.current-context}" ) == self .initial_context
73- self .log .info (self .warnet ("auth kubeconfigs/bob-wargames-red-team-kubeconfig " ))
74- assert get_kubeconfig_value ("{.current-context}" ) == "bob-wargames-red-team"
94+ self .log .info (self .warnet (f "auth kubeconfigs/{ self . bob_auth_file } " ))
95+ assert get_kubeconfig_value ("{.current-context}" ) == self . bob_context
7596
7697 def service_accounts_are_validated (self ) -> bool :
7798 self .log .info ("Checking service accounts" )
@@ -93,8 +114,8 @@ def service_accounts_are_validated(self) -> bool:
93114 maybe_service_accounts .setdefault (namespace , []).append (sa .metadata .name )
94115
95116 expected = {
96- "wargames-blue-team" : [ "carol" , "default" , "mallory" ] ,
97- "wargames-red-team" : [ "alice" , "bob" , "default" ] ,
117+ self . blue_namespace : self . blue_users ,
118+ self . red_namespace : self . red_users ,
98119 }
99120
100121 return maybe_service_accounts == expected
@@ -117,9 +138,38 @@ def two_namespaces_are_validated(self) -> bool:
117138 return False
118139 if len (maybe_namespaces ) != 2 :
119140 return False
120- if "wargames-blue-team" not in maybe_namespaces :
141+ if self . blue_namespace not in maybe_namespaces :
121142 return False
122- return "wargames-red-team" in maybe_namespaces
143+ return self .red_namespace in maybe_namespaces
144+
145+ def cleanup_kubeconfig (self ):
146+ pass
147+ try :
148+ kubeconfig_data = open_kubeconfig (KUBECONFIG )
149+ except K8sError as e :
150+ raise K8sError (f"Could not open KUBECONFIG: { KUBECONFIG } " ) from e
151+
152+ kubeconfig_data = remove_user (kubeconfig_data , self .bob_user )
153+ kubeconfig_data = remove_context (kubeconfig_data , self .bob_context )
154+
155+ try :
156+ write_kubeconfig (kubeconfig_data , KUBECONFIG )
157+ except Exception as e :
158+ raise K8sError (f"Could not write to KUBECONFIG: { KUBECONFIG } " ) from e
159+
160+
161+ def remove_user (kubeconfig_data : dict , username : str ) -> dict :
162+ kubeconfig_data ["users" ] = [
163+ user for user in kubeconfig_data ["users" ] if user ["name" ] != username
164+ ]
165+ return kubeconfig_data
166+
167+
168+ def remove_context (kubeconfig_data : dict , context_name : str ) -> dict :
169+ kubeconfig_data ["contexts" ] = [
170+ context for context in kubeconfig_data ["contexts" ] if context ["name" ] != context_name
171+ ]
172+ return kubeconfig_data
123173
124174
125175if __name__ == "__main__" :
0 commit comments