@@ -62,10 +62,10 @@ def has_cluster(self, name):
62
62
Return true if this kubeconfig contains an entry
63
63
For the passed cluster name.
64
64
"""
65
- if 'clusters' not in self . content :
65
+ if self . content . get ( 'clusters' ) is None :
66
66
return False
67
67
return name in [cluster ['name' ]
68
- for cluster in self .content ['clusters' ]]
68
+ for cluster in self .content ['clusters' ] if 'name' in cluster ]
69
69
70
70
71
71
class KubeconfigValidator (object ):
@@ -83,7 +83,7 @@ def validate_config(self, config):
83
83
"""
84
84
if not isinstance (config , Kubeconfig ):
85
85
raise KubeconfigCorruptedError ("Internal error: "
86
- "Not a Kubeconfig object ." )
86
+ f "Not a { Kubeconfig } ." )
87
87
self ._validate_config_types (config )
88
88
self ._validate_list_entry_types (config )
89
89
@@ -96,18 +96,14 @@ def _validate_config_types(self, config):
96
96
:type config: Kubeconfig
97
97
"""
98
98
if not isinstance (config .content , dict ):
99
- raise KubeconfigCorruptedError ("Content not a dictionary ." )
99
+ raise KubeconfigCorruptedError (f "Content not a { dict } ." )
100
100
for key , value in self ._validation_content .items ():
101
101
if (key in config .content and
102
102
config .content [key ] is not None and
103
103
not isinstance (config .content [key ], type (value ))):
104
104
raise KubeconfigCorruptedError (
105
- "{0} is wrong type:{1} "
106
- "(Should be {2})" .format (
107
- key ,
108
- type (config .content [key ]),
109
- type (value )
110
- )
105
+ f"{ key } is wrong type: { type (config .content [key ])} "
106
+ f"(Should be { type (value )} )"
111
107
)
112
108
113
109
def _validate_list_entry_types (self , config ):
@@ -124,14 +120,14 @@ def _validate_list_entry_types(self, config):
124
120
for element in config .content [key ]:
125
121
if not isinstance (element , OrderedDict ):
126
122
raise KubeconfigCorruptedError (
127
- "Entry in {0 } not a dictionary." . format ( key ) )
123
+ f "Entry in { key } not a { dict } . " )
128
124
129
125
130
126
class KubeconfigLoader (object ):
131
- def __init__ (self , validator = None ):
127
+ def __init__ (self , validator = None ):
132
128
if validator is None :
133
- validator = KubeconfigValidator ()
134
- self ._validator = validator
129
+ validator = KubeconfigValidator ()
130
+ self ._validator = validator
135
131
136
132
def load_kubeconfig (self , path ):
137
133
"""
@@ -152,18 +148,18 @@ def load_kubeconfig(self, path):
152
148
"""
153
149
try :
154
150
with open (path , "r" ) as stream :
155
- loaded_content = ordered_yaml_load (stream )
151
+ loaded_content = ordered_yaml_load (stream )
156
152
except IOError as e :
157
153
if e .errno == errno .ENOENT :
158
- loaded_content = None
154
+ loaded_content = None
159
155
else :
160
156
raise KubeconfigInaccessableError (
161
- "Can't open kubeconfig for reading: {0}" . format ( e ) )
157
+ f "Can't open kubeconfig for reading: { e } " )
162
158
except yaml .YAMLError as e :
163
159
raise KubeconfigCorruptedError (
164
- "YamlError while loading kubeconfig: {0}" . format ( e ) )
160
+ f "YamlError while loading kubeconfig: { e } " )
165
161
166
- loaded_config = Kubeconfig (path , loaded_content )
162
+ loaded_config = Kubeconfig (path , loaded_content )
167
163
self ._validator .validate_config (loaded_config )
168
164
169
165
return loaded_config
@@ -181,14 +177,14 @@ def write_kubeconfig(self, config):
181
177
:raises KubeconfigInaccessableError: if the kubeconfig
182
178
can't be opened for writing
183
179
"""
184
- directory = os .path .dirname (config .path )
180
+ directory = os .path .dirname (config .path )
185
181
186
182
try :
187
183
os .makedirs (directory )
188
184
except OSError as e :
189
185
if e .errno != errno .EEXIST :
190
186
raise KubeconfigInaccessableError (
191
- "Can't create directory for writing: {0}" . format ( e ) )
187
+ f "Can't create directory for writing: { e } " )
192
188
try :
193
189
with os .fdopen (
194
190
os .open (
@@ -199,42 +195,44 @@ def write_kubeconfig(self, config):
199
195
ordered_yaml_dump (config .content , stream )
200
196
except (IOError , OSError ) as e :
201
197
raise KubeconfigInaccessableError (
202
- "Can't open kubeconfig for writing: {0}" . format ( e ) )
198
+ f "Can't open kubeconfig for writing: { e } " )
203
199
204
200
205
201
class KubeconfigAppender (object ):
206
- def insert_entry (self , config , key , entry ):
202
+ def insert_entry (self , config , key , new_entry ):
207
203
"""
208
- Insert entry into the array at content[key]
204
+ Insert entry into the entries list at content[key]
209
205
Overwrite an existing entry if they share the same name
210
206
211
207
:param config: The kubeconfig to insert an entry into
212
208
:type config: Kubeconfig
213
209
"""
214
- if key not in config .content :
215
- config .content [key ] = []
216
- array = config .content [key ]
217
- if not isinstance (array , list ):
218
- raise KubeconfigError ("Tried to insert into {0},"
219
- "which is a {1} "
220
- "not a {2}" .format (key ,
221
- type (array ),
222
- list ))
223
- found = False
224
- for counter , existing_entry in enumerate (array ):
225
- if "name" in existing_entry and \
226
- "name" in entry and \
227
- existing_entry ["name" ] == entry ["name" ]:
228
- array [counter ] = entry
229
- found = True
230
-
231
- if not found :
232
- array .append (entry )
233
-
234
- config .content [key ] = array
210
+ entries = self ._setdefault_existing_entries (config , key )
211
+ same_name_index = self ._index_same_name (entries , new_entry )
212
+ if same_name_index is None :
213
+ entries .append (new_entry )
214
+ else :
215
+ entries [same_name_index ]= new_entry
235
216
return config
236
217
237
- def _make_context (self , cluster , user , alias = None ):
218
+ def _setdefault_existing_entries (self , config , key ):
219
+ config .content [key ]= config .content .get (key ) or []
220
+ entries = config .content [key ]
221
+ if not isinstance (entries , list ):
222
+ raise KubeconfigError (f"Tried to insert into { key } , "
223
+ f"which is a { type (entries )} "
224
+ f"not a { list } " )
225
+ return entries
226
+
227
+ def _index_same_name (self , entries , new_entry ):
228
+ if "name" in new_entry :
229
+ name_to_search = new_entry ["name" ]
230
+ for i , entry in enumerate (entries ):
231
+ if "name" in entry and entry ["name" ] == name_to_search :
232
+ return i
233
+ return None
234
+
235
+ def _make_context (self , cluster , user , alias = None ):
238
236
""" Generate a context to associate cluster and user with a given alias."""
239
237
return OrderedDict ([
240
238
("context" , OrderedDict ([
@@ -244,7 +242,7 @@ def _make_context(self, cluster, user, alias=None):
244
242
("name" , alias or user ["name" ])
245
243
])
246
244
247
- def insert_cluster_user_pair (self , config , cluster , user , alias = None ):
245
+ def insert_cluster_user_pair (self , config , cluster , user , alias = None ):
248
246
"""
249
247
Insert the passed cluster entry and user entry,
250
248
then make a context to associate them
@@ -266,11 +264,11 @@ def insert_cluster_user_pair(self, config, cluster, user, alias=None):
266
264
:return: The generated context
267
265
:rtype: OrderedDict
268
266
"""
269
- context = self ._make_context (cluster , user , alias = alias )
267
+ context = self ._make_context (cluster , user , alias = alias )
270
268
self .insert_entry (config , "clusters" , cluster )
271
269
self .insert_entry (config , "users" , user )
272
270
self .insert_entry (config , "contexts" , context )
273
271
274
- config .content ["current-context" ] = context ["name" ]
272
+ config .content ["current-context" ]= context ["name" ]
275
273
276
274
return context
0 commit comments