23
23
import typing
24
24
25
25
from sambacc import config
26
- from sambacc import jfile
27
26
from sambacc import samba_cmds
27
+ from sambacc .jfile import ClusterMetaJSONFile
28
28
from sambacc .netcmd_loader import template_config
29
29
from sambacc .typelets import ExcType , ExcValue , ExcTraceback
30
30
39
39
CTDB_NODES : str = "/etc/ctdb/nodes"
40
40
41
41
42
+ class ClusterMetaObject (typing .Protocol ):
43
+ def load (self ) -> typing .Any :
44
+ ...
45
+
46
+ def dump (self , data : typing .Any ) -> None :
47
+ ...
48
+
49
+
50
+ class ClusterMeta (typing .Protocol ):
51
+ def open (
52
+ self , * , read : bool = True , write : bool = False , locked : bool = False
53
+ ) -> typing .ContextManager [ClusterMetaObject ]:
54
+ ...
55
+
56
+
42
57
class NodeState (str , enum .Enum ):
43
58
NEW = "new"
44
59
READY = "ready"
@@ -184,11 +199,26 @@ def add_node_to_statefile(
184
199
state file, located at `path`. If in_nodes is true, the state file will
185
200
reflect that the node is already added to the CTDB nodes file.
186
201
"""
187
- with jfile .open (path , jfile .OPEN_RW ) as fh :
188
- jfile .flock (fh )
189
- data = jfile .load (fh , {})
202
+ add_node_to_cluster_meta (
203
+ ClusterMetaJSONFile (path ), identity , node , pnn , in_nodes = in_nodes
204
+ )
205
+
206
+
207
+ def add_node_to_cluster_meta (
208
+ cmeta : ClusterMeta ,
209
+ identity : str ,
210
+ node : str ,
211
+ pnn : int ,
212
+ in_nodes : bool = False ,
213
+ ) -> None :
214
+ """Add the given node's identity, (node) IP, and PNN to the cluster
215
+ metadata. If in_nodes is true, the state file will reflect that the node
216
+ is already added to the CTDB nodes file.
217
+ """
218
+ with cmeta .open (write = True , locked = True ) as cmo :
219
+ data = cmo .load ()
190
220
_update_statefile (data , identity , node , pnn , in_nodes = in_nodes )
191
- jfile .dump (data , fh )
221
+ cmo .dump (data )
192
222
193
223
194
224
def refresh_node_in_statefile (
@@ -197,11 +227,21 @@ def refresh_node_in_statefile(
197
227
"""Assuming the node is already in the statefile, update the state in
198
228
the case that the node (IP) has changed.
199
229
"""
200
- with jfile .open (path , jfile .OPEN_RW ) as fh :
201
- jfile .flock (fh )
202
- data = jfile .load (fh , {})
230
+ refresh_node_in_cluster_meta (
231
+ ClusterMetaJSONFile (path ), identity , node , pnn
232
+ )
233
+
234
+
235
+ def refresh_node_in_cluster_meta (
236
+ cmeta : ClusterMeta , identity : str , node : str , pnn : int
237
+ ) -> None :
238
+ """Assuming the node is already in the cluster metadata, update the state
239
+ in the case that the node (IP) has changed.
240
+ """
241
+ with cmeta .open (write = True , locked = True ) as cmo :
242
+ data = cmo .load ()
203
243
_refresh_statefile (data , identity , node , pnn )
204
- jfile .dump (data , fh )
244
+ cmo .dump (data )
205
245
206
246
207
247
def _update_statefile (
@@ -268,13 +308,19 @@ def pnn_in_nodes(pnn: int, nodes_json: str, real_path: str) -> bool:
268
308
"""Returns true if the specified pnn has an entry in the nodes json
269
309
file and that the node is already added to the ctdb nodes file.
270
310
"""
271
- with jfile .open (nodes_json , jfile .OPEN_RO ) as fh :
272
- jfile .flock (fh )
273
- json_data = jfile .load (fh , {})
274
- current_nodes = json_data .get ("nodes" , [])
275
- for entry in current_nodes :
276
- if pnn == entry ["pnn" ] and _get_state_ok (entry ):
277
- return True
311
+ return pnn_in_cluster_meta (ClusterMetaJSONFile (nodes_json ), pnn )
312
+
313
+
314
+ def pnn_in_cluster_meta (cmeta : ClusterMeta , pnn : int ) -> bool :
315
+ """Returns true if the specified pnn has an entry in the cluster metadata
316
+ and that entry is ready for use.
317
+ """
318
+ with cmeta .open (locked = True ) as cmo :
319
+ json_data = cmo .load ()
320
+ current_nodes = json_data .get ("nodes" , [])
321
+ for entry in current_nodes :
322
+ if pnn == entry ["pnn" ] and _get_state_ok (entry ):
323
+ return True
278
324
return False
279
325
280
326
@@ -287,19 +333,19 @@ def manage_nodes(
287
333
"""Monitor nodes json for updates, reflecting those changes into ctdb."""
288
334
while True :
289
335
_logger .info ("checking if node is able to make updates" )
290
- if _node_check (pnn , nodes_json , real_path ):
336
+ cmeta = ClusterMetaJSONFile (nodes_json )
337
+ if _node_check (cmeta , pnn , real_path ):
291
338
_logger .info ("checking for node updates" )
292
- if _node_update (nodes_json , real_path ):
339
+ if _node_update (cmeta , real_path ):
293
340
_logger .info ("updated nodes" )
294
341
else :
295
342
_logger .warning ("node can not make updates" )
296
343
pause_func ()
297
344
298
345
299
- def _node_check (pnn : int , nodes_json : str , real_path : str ) -> bool :
300
- with jfile .open (nodes_json , jfile .OPEN_RO ) as fh :
301
- jfile .flock (fh )
302
- desired = jfile .load (fh , {}).get ("nodes" , [])
346
+ def _node_check (cmeta : ClusterMeta , pnn : int , real_path : str ) -> bool :
347
+ with cmeta .open (locked = True ) as cmo :
348
+ desired = cmo .load ().get ("nodes" , [])
303
349
ctdb_nodes = read_ctdb_nodes (real_path )
304
350
# first: check to see if the current node is in the nodes file
305
351
try :
@@ -317,7 +363,7 @@ def _node_check(pnn: int, nodes_json: str, real_path: str) -> bool:
317
363
318
364
319
365
def _node_update_check (
320
- json_data : dict [str , typing .Any ], nodes_json : str , real_path : str
366
+ json_data : dict [str , typing .Any ], real_path : str
321
367
) -> tuple [list [str ], list [typing .Any ], list [typing .Any ]]:
322
368
desired = json_data .get ("nodes" , [])
323
369
ctdb_nodes = read_ctdb_nodes (real_path )
@@ -358,27 +404,25 @@ def _entry_to_node(ctdb_nodes: list[str], entry: dict[str, typing.Any]) -> str:
358
404
return entry ["node" ]
359
405
360
406
361
- def _node_update (nodes_json : str , real_path : str ) -> bool :
407
+ def _node_update (cmeta : ClusterMeta , real_path : str ) -> bool :
362
408
# open r/o so that we don't initailly open for write. we do a probe and
363
409
# decide if anything needs to be updated if we are wrong, its not a
364
410
# problem, we'll "time out" and reprobe later
365
- with jfile .open (nodes_json , jfile .OPEN_RO ) as fh :
366
- jfile .flock (fh )
367
- json_data = jfile .load (fh , {})
411
+ with cmeta .open (locked = True ) as cmo :
412
+ json_data = cmo .load ()
368
413
_ , test_chg_nodes , test_need_reload = _node_update_check (
369
- json_data , nodes_json , real_path
414
+ json_data , real_path
370
415
)
371
416
if not test_chg_nodes and not test_need_reload :
372
417
_logger .info ("examined nodes state - no changes" )
373
418
return False
374
419
# we probably need to make a change. but we recheck our state again
375
420
# under lock, with the data file open r/w
376
421
# update the nodes file and make changes to ctdb
377
- with jfile .open (nodes_json , jfile .OPEN_RW ) as fh :
378
- jfile .flock (fh )
379
- json_data = jfile .load (fh , {})
422
+ with cmeta .open (write = True , locked = True ) as cmo :
423
+ json_data = cmo .load ()
380
424
ctdb_nodes , chg_nodes , need_reload = _node_update_check (
381
- json_data , nodes_json , real_path
425
+ json_data , real_path
382
426
)
383
427
if not chg_nodes and not need_reload :
384
428
_logger .info ("reexamined nodes state - no changes" )
@@ -414,9 +458,7 @@ def _node_update(nodes_json: str, real_path: str) -> bool:
414
458
entry ["state" ],
415
459
)
416
460
)
417
- jfile .dump (json_data , fh )
418
- fh .flush ()
419
- os .fsync (fh )
461
+ cmo .dump (json_data )
420
462
return True
421
463
422
464
0 commit comments