23
23
from re import match as re_match
24
24
from typing import Any , Union
25
25
26
- from jsonschema import ValidationError , validate , validators
27
-
28
26
from ardupilot_methodic_configurator import _
27
+ from ardupilot_methodic_configurator .backend_filesystem_json_with_schema import FilesystemJSONWithSchema
29
28
from ardupilot_methodic_configurator .backend_filesystem_program_settings import ProgramSettings
30
29
from ardupilot_methodic_configurator .data_model_vehicle_components_json_schema import VehicleComponentsJsonSchema
31
30
from ardupilot_methodic_configurator .middleware_template_overview import TemplateOverview
@@ -35,10 +34,9 @@ class VehicleComponents:
35
34
"""Load and save vehicle components configurations from a JSON file."""
36
35
37
36
def __init__ (self , save_component_to_system_templates : bool = False ) -> None :
38
- self .vehicle_components_json_filename = "vehicle_components.json"
39
- self .vehicle_components_schema_filename = "vehicle_components_schema.json"
40
- self .vehicle_components : Union [None , dict [str , Any ]] = None
41
- self .schema : Union [None , dict [Any , Any ]] = None
37
+ self .vehicle_components_fs : FilesystemJSONWithSchema = FilesystemJSONWithSchema (
38
+ "vehicle_components.json" , "vehicle_components_schema.json"
39
+ )
42
40
self .save_component_to_system_templates = save_component_to_system_templates
43
41
44
42
def load_schema (self ) -> dict :
@@ -47,35 +45,7 @@ def load_schema(self) -> dict:
47
45
48
46
:return: The schema as a dictionary
49
47
"""
50
- if self .schema is not None :
51
- return self .schema
52
-
53
- # Determine the location of the schema file
54
- schema_path = os_path .join (os_path .dirname (__file__ ), self .vehicle_components_schema_filename )
55
-
56
- try :
57
- with open (schema_path , encoding = "utf-8" ) as file :
58
- loaded_schema : dict [Any , Any ] = json_load (file )
59
-
60
- # Validate the schema itself against the JSON Schema meta-schema
61
- try :
62
- # Get the Draft7Validator class which has the META_SCHEMA property
63
- validator_class = validators .Draft7Validator
64
- meta_schema = validator_class .META_SCHEMA
65
-
66
- # Validate the loaded schema against the meta-schema
67
- validate (instance = loaded_schema , schema = meta_schema )
68
- logging_debug (_ ("Schema file '%s' is valid." ), schema_path )
69
- except Exception as e : # pylint: disable=broad-exception-caught
70
- logging_error (_ ("Schema file '%s' is not a valid JSON Schema: %s" ), schema_path , str (e ))
71
-
72
- self .schema = loaded_schema
73
- return self .schema
74
- except FileNotFoundError :
75
- logging_error (_ ("Schema file '%s' not found." ), schema_path )
76
- except JSONDecodeError :
77
- logging_error (_ ("Error decoding JSON schema from file '%s'." ), schema_path )
78
- return {}
48
+ return self .vehicle_components_fs .load_schema ()
79
49
80
50
def load_component_templates (self ) -> dict [str , list [dict ]]:
81
51
"""
@@ -285,113 +255,42 @@ def validate_vehicle_components(self, data: dict) -> tuple[bool, str]:
285
255
:param data: The vehicle components data to validate
286
256
:return: A tuple of (is_valid, error_message)
287
257
"""
288
- schema = self .load_schema ()
289
- if not schema :
290
- return False , _ ("Could not load validation schema" )
291
-
292
- try :
293
- validate (instance = data , schema = schema )
294
- return True , ""
295
- except ValidationError as e :
296
- return False , f"{ _ ('Validation error' )} : { e .message } "
258
+ return self .vehicle_components_fs .validate_json_against_schema (data )
297
259
298
260
def load_vehicle_components_json_data (self , vehicle_dir : str ) -> dict [Any , Any ]:
299
- data : dict [Any , Any ] = {}
300
- filepath = os_path .join (vehicle_dir , self .vehicle_components_json_filename )
301
- try :
302
- with open (filepath , encoding = "utf-8" ) as file :
303
- data = json_load (file )
261
+ return self .vehicle_components_fs .load_json_data (vehicle_dir )
304
262
305
- # Validate the loaded data against the schema
306
- is_valid , error_message = self .validate_vehicle_components (data )
307
- if not is_valid :
308
- logging_error (_ ("Invalid vehicle components file '%s': %s" ), filepath , error_message )
309
- # We still return the data even if invalid for debugging purposes
310
- except FileNotFoundError :
311
- # Normal users do not need this information
312
- logging_debug (_ ("File '%s' not found in %s." ), self .vehicle_components_json_filename , vehicle_dir )
313
- except JSONDecodeError :
314
- logging_error (_ ("Error decoding JSON data from file '%s'." ), filepath )
315
- self .vehicle_components = data
316
- return data
317
-
318
- def save_vehicle_components_json_data (self , data : dict , vehicle_dir : str ) -> tuple [bool , str ]: # noqa: PLR0911 # pylint: disable=too-many-return-statements
263
+ def save_vehicle_components_json_data (self , data : dict , vehicle_dir : str ) -> tuple [bool , str ]:
319
264
"""
320
265
Save the vehicle components data to a JSON file.
321
266
322
267
:param data: The vehicle components data to save
323
268
:param vehicle_dir: The directory to save the file in
324
269
:return: A tuple of (error_occurred, error_message)
325
270
"""
326
- # Validate before saving
327
- # commented out until https://github.com/ArduPilot/MethodicConfigurator/pull/237 gets merged
328
- # is_valid, error_message = self.validate_vehicle_components(data)
329
- # if not is_valid:
330
- # msg = _("Cannot save invalid vehicle components data: {}").format(error_message)
331
- # logging_error(msg)
332
- # return True, msg
333
-
334
- filepath = os_path .join (vehicle_dir , self .vehicle_components_json_filename )
335
- try :
336
- with open (filepath , "w" , encoding = "utf-8" , newline = "\n " ) as file :
337
- json_dump (data , file , indent = 4 )
338
- except FileNotFoundError :
339
- msg = _ ("Directory '{}' not found" ).format (vehicle_dir )
340
- logging_error (msg )
341
- return True , msg
342
- except PermissionError :
343
- msg = _ ("Permission denied when writing to file '{}'" ).format (filepath )
344
- logging_error (msg )
345
- return True , msg
346
- except IsADirectoryError :
347
- msg = _ ("Path '{}' is a directory, not a file" ).format (filepath )
348
- logging_error (msg )
349
- return True , msg
350
- except OSError as e :
351
- msg = _ ("OS error when writing to file '{}': {}" ).format (filepath , str (e ))
352
- logging_error (msg )
353
- return True , msg
354
- except TypeError as e :
355
- msg = _ ("Type error when serializing data to JSON: {}" ).format (str (e ))
356
- logging_error (msg )
357
- return True , msg
358
- except ValueError as e :
359
- msg = _ ("Value error when serializing data to JSON: {}" ).format (str (e ))
360
- logging_error (msg )
361
- return True , msg
362
- except Exception as e : # pylint: disable=broad-exception-caught
363
- # Still have a fallback for truly unexpected errors
364
- msg = _ ("Unexpected error saving data to file '{}': {}" ).format (filepath , str (e ))
365
- logging_error (msg )
366
- return True , msg
367
-
368
- return False , ""
271
+ return self .vehicle_components_fs .save_json_data (data , vehicle_dir )
369
272
370
273
def get_fc_fw_type_from_vehicle_components_json (self ) -> str :
371
- if self .vehicle_components and "Components" in self .vehicle_components :
372
- components = self .vehicle_components ["Components" ]
373
- else :
374
- components = None
274
+ data = self .vehicle_components_fs .data
275
+ components = data ["Components" ] if data and "Components" in data else None
375
276
if components :
376
277
fw_type : str = components .get ("Flight Controller" , {}).get ("Firmware" , {}).get ("Type" , "" )
377
278
if fw_type in self .supported_vehicles ():
378
279
return fw_type
379
- error_msg = _ ("Firmware type {fw_type} in {self.vehicle_components_json_filename } is not supported" )
380
- logging_error (error_msg .format (** locals () ))
280
+ error_msg = _ ("Firmware type {fw_type} in {filename } is not supported" )
281
+ logging_error (error_msg .format (fw_type = fw_type , filename = self . vehicle_components_fs . json_filename ))
381
282
return ""
382
283
383
284
def get_fc_fw_version_from_vehicle_components_json (self ) -> str :
384
- if self .vehicle_components and "Components" in self .vehicle_components :
385
- components = self .vehicle_components ["Components" ]
386
- else :
387
- components = None
285
+ data = self .vehicle_components_fs .data
286
+ components = data ["Components" ] if data and "Components" in data else None
388
287
if components :
389
288
version_str : str = components .get ("Flight Controller" , {}).get ("Firmware" , {}).get ("Version" , "" )
390
289
version_str = version_str .lstrip ().split (" " )[0 ] if version_str else ""
391
290
if re_match (r"^\d+\.\d+\.\d+$" , version_str ):
392
291
return version_str
393
- error_msg = _ ("FW version string {version_str} on {self.vehicle_components_json_filename } is invalid" )
394
- logging_error (error_msg .format (** locals () ))
292
+ error_msg = _ ("FW version string {version_str} on {filename } is invalid" )
293
+ logging_error (error_msg .format (version_str = version_str , filename = self . vehicle_components_fs . json_filename ))
395
294
return ""
396
295
397
296
@staticmethod
@@ -410,7 +309,7 @@ def get_vehicle_components_overviews() -> dict[str, TemplateOverview]:
410
309
:return: A dictionary mapping subdirectory paths to TemplateOverview instances.
411
310
"""
412
311
vehicle_components_dict = {}
413
- file_to_find = VehicleComponents ().vehicle_components_json_filename
312
+ file_to_find = VehicleComponents ().vehicle_components_fs . json_filename
414
313
template_default_dir = ProgramSettings .get_templates_base_dir ()
415
314
for root , _dirs , files in os_walk (template_default_dir ):
416
315
if file_to_find in files :
@@ -437,8 +336,9 @@ def wipe_component_info(self) -> None:
437
336
Preserves the complete structure of the dictionary including all branches and leaves,
438
337
but sets leaf values to empty values based on their type.
439
338
"""
440
- if self .vehicle_components is not None :
441
- self ._recursively_clear_dict (self .vehicle_components )
339
+ data = self .vehicle_components_fs .data
340
+ if data is not None :
341
+ self ._recursively_clear_dict (data )
442
342
443
343
def _recursively_clear_dict (self , data : Union [dict , list , float , bool , str ]) -> None :
444
344
"""
0 commit comments