forked from ArduPilot/MethodicConfigurator
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_model_vehicle_components_base.py
More file actions
404 lines (336 loc) · 16.6 KB
/
data_model_vehicle_components_base.py
File metadata and controls
404 lines (336 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
"""
Data model for vehicle components.
This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator
SPDX-FileCopyrightText: 2024-2026 Amilcar do Carmo Lucas <amilcar.lucas@iav.de>
SPDX-License-Identifier: GPL-3.0-or-later
"""
# from logging import debug as logging_debug
from copy import deepcopy
from logging import error as logging_error
from logging import warning as logging_warning
from typing import Any, Optional, Union
from ardupilot_methodic_configurator import _, __version__
from ardupilot_methodic_configurator.backend_filesystem import LocalFilesystem
from ardupilot_methodic_configurator.battery_cell_voltages import BatteryCell
from ardupilot_methodic_configurator.data_model_vehicle_components_json_schema import VehicleComponentsJsonSchema
# Type aliases to improve code readability
ComponentPath = tuple[str, ...]
ComponentData = dict[str, Any]
ComponentValue = Union[str, int, float]
class ComponentDataModelBase:
"""
A class to handle component data operations separate from UI logic.
This improves testability by isolating data operations.
"""
def __init__(
self, initial_data: ComponentData, component_datatypes: dict[str, Any], schema: VehicleComponentsJsonSchema
) -> None:
self._data: ComponentData = deepcopy(initial_data) if initial_data else {"Components": {}, "Format version": 1}
self._battery_chemistry: str = ""
self._possible_choices: dict[ComponentPath, tuple[str, ...]] = {}
self._mot_pwm_types: tuple[str, ...] = ()
self._component_datatypes: dict[str, Any] = component_datatypes
self._is_new_project: bool = False
self.schema: VehicleComponentsJsonSchema = schema
def get_component_data(self) -> ComponentData:
"""
Get the complete component data.
Only used in pytest code
"""
return self._data
def get_component_value(self, path: ComponentPath) -> Union[ComponentData, ComponentValue]:
"""Get a specific component value from the data structure."""
data_path = self._data["Components"]
for key in path:
if key not in data_path:
empty_dict: dict[str, Any] = {}
return empty_dict
data_path = data_path[key]
# Ensure we return a value that matches our ComponentValue type
if isinstance(data_path, (str, int, float, dict)):
return data_path
# If it's some other type, convert to string
return str(data_path)
def set_component_value(self, path: ComponentPath, value: Union[ComponentData, ComponentValue, None]) -> None:
"""Set a specific component value in the data structure."""
if value is None:
value = ""
# Ensure Components key exists
if "Components" not in self._data:
self._data["Components"] = {}
data_path: ComponentData = self._data["Components"]
# Navigate to the correct place in the data structure
for key in path[:-1]:
if key not in data_path:
data_path[key] = {}
data_path = data_path[key]
# Update the value using type-safe casting
datatype = self._get_component_datatype(path)
if datatype:
data_path[path[-1]] = self._safe_cast_value(value, datatype, path)
else: # fallback to a less intelligent method
# If the component has a specific datatype, use it to process the value
data_path[path[-1]] = self._process_value(path, str(value) if value is not None else None)
def _get_component_datatype(self, path: ComponentPath) -> Optional[type]:
"""
Safely get the Python datatype for a component path from the nested datatypes dictionary.
Args:
path: The component path tuple (e.g., ("Battery", "Specifications", "Capacity mAh"))
Returns:
The Python type if found, None otherwise
"""
if not self._component_datatypes or len(path) < 3:
return None
try:
component_type = path[0]
section = path[1]
field = path[2]
result = self._component_datatypes.get(component_type, {}).get(section, {}).get(field)
# Ensure we return a type or None
return result if isinstance(result, type) else None
except (KeyError, AttributeError, TypeError):
return None
def _safe_cast_value( # noqa: PLR0911 pylint: disable=too-many-return-statements
self, value: Union[ComponentData, ComponentValue, None], datatype: type, path: ComponentPath
) -> Any: # noqa: ANN401 # Use Any to handle dict/list returns that don't fit ComponentValue
"""
Safely cast a value to the specified datatype with proper error handling.
Args:
value: The value to cast
datatype: The target Python type
path: The component path for error context
Returns:
The cast value, or falls back to _process_value on error
"""
if value is None:
# Handle None values based on datatype
if datatype is str:
return ""
if datatype in (int, float):
return datatype(0)
if datatype is bool:
return False
if datatype is list:
return []
if datatype is dict:
return {}
return ""
# If already the correct type, return as-is
if isinstance(value, datatype):
return value
try:
# Special handling for boolean conversion
if datatype is bool:
if isinstance(value, str):
return value.lower() in ("true", "1", "yes", "on")
return bool(value)
# Special handling for list/dict types
if datatype in (list, dict):
type_name = getattr(datatype, "__name__", repr(datatype))
logging_warning(
_("Failed to cast value '%s' to %s for path %s: %s"),
value,
type_name,
path,
"list and dict types require structured data",
)
return self._process_value(path, str(value) if value is not None else None)
# Standard type conversion
return datatype(value)
except (ValueError, TypeError, AttributeError) as e:
# Log the error and fall back to the original processing method
type_name = getattr(datatype, "__name__", repr(datatype))
logging_warning(_("Failed to cast value '%s' to %s for path %s: %s"), value, type_name, path, e)
return self._process_value(path, str(value) if value is not None else None)
def _process_value(self, path: ComponentPath, value: Union[str, None]) -> ComponentValue:
"""Process a string value into the appropriate type based on context."""
# Handle None value
if value is None:
return ""
# Special handling for Version fields
if path[-1] != "Version":
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
return str(value).strip()
return str(value).strip()
def get_all_components(self) -> ComponentData:
"""Get all components data."""
empty_dict: ComponentData = {}
return self._data.get("Components", empty_dict) # type: ignore[no-any-return]
def has_components(self) -> bool:
"""Check if there are any components in the data."""
return len(self.get_all_components()) >= 1
def save_to_filesystem(self, filesystem: LocalFilesystem) -> tuple[bool, str]:
"""Save component data to filesystem - centralizes save logic."""
return filesystem.save_vehicle_components_json_data(self.get_component_data(), filesystem.vehicle_dir)
def post_init(self, doc_dict: dict) -> None:
"""Update the data structure to ensure all required fields are present."""
self.update_json_structure()
self.init_possible_choices(doc_dict)
self.init_battery_chemistry()
self.correct_display_values_in_loaded_data()
def update_json_structure(self) -> None:
"""
Update the data structure to ensure all required fields are present.
Used to update old JSON files to the latest format.
"""
# Define the default structure with all required fields
default_structure = {
"Format version": 1,
"Program version": __version__,
"Components": {
"Battery": {
"Specifications": {
"Chemistry": "Lipo",
"Capacity mAh": 0,
}
},
"Frame": {
"Specifications": {
"TOW min Kg": 1,
"TOW max Kg": 1,
}
},
"Flight Controller": {
"Product": {},
"Firmware": {},
"Specifications": {"MCU Series": "Unknown"},
"Notes": "",
},
},
}
# Handle legacy field renaming before merging
if "GNSS receiver" in self._data.get("Components", {}):
components = self._data.setdefault("Components", {})
components["GNSS Receiver"] = components.pop("GNSS receiver")
# Handle legacy battery monitor protocol migration for protocols that don't need hardware connections
# This is a local import to avoid a circular import dependency
from ardupilot_methodic_configurator.data_model_vehicle_components_validation import ( # pylint: disable=import-outside-toplevel, cyclic-import # noqa: PLC0415
BATT_MONITOR_CONNECTION,
OTHER_PORTS,
)
# Calculate protocols that use OTHER_PORTS (don't require specific hardware connections)
battmon_other_protocols = {
str(value["protocol"]) for value in BATT_MONITOR_CONNECTION.values() if value.get("type") == OTHER_PORTS
}
battery_monitor_protocol = (
self._data.get("Components", {}).get("Battery Monitor", {}).get("FC Connection", {}).get("Protocol")
)
if battery_monitor_protocol in battmon_other_protocols:
# These protocols don't require specific hardware connections, so we can safely migrate them
battery_monitor = self._data.setdefault("Components", {}).setdefault("Battery Monitor", {})
battery_monitor.setdefault("FC Connection", {})["Type"] = "other"
# Handle GNSS protocol name migration from older versions
# Protocol names were made more descriptive with manufacturer names
gnss_protocol_migration = {
"SBF": "Septentrio(SBF)",
"GSOF": "Trimble(GSOF)",
"SBF-DualAntenna": "Septentrio-DualAntenna(SBF)",
}
gnss_receiver_protocol = (
self._data.get("Components", {}).get("GNSS Receiver", {}).get("FC Connection", {}).get("Protocol")
)
if gnss_receiver_protocol in gnss_protocol_migration:
# Migrate to new protocol name
gnss_receiver = self._data.setdefault("Components", {}).setdefault("GNSS Receiver", {})
gnss_receiver.setdefault("FC Connection", {})["Protocol"] = gnss_protocol_migration[gnss_receiver_protocol]
# Merge existing data onto default structure (preserves existing values)
self._data = self._deep_merge_dicts(default_structure, self._data)
self._data["Components"] = self._reorder_components(self._data.get("Components", {}))
self._data["Program version"] = __version__
def _deep_merge_dicts(self, default: dict[str, Any], existing: dict[str, Any]) -> dict[str, Any]:
"""
Deep merge two dictionaries, preserving existing values and key order.
Args:
default: Default structure with fallback values
existing: Existing data to preserve
Returns:
Merged dictionary with existing values taking precedence and preserving existing key order
"""
# Start with existing dictionary to preserve its key order
result = existing.copy()
# Add any missing keys from the default structure
for key, value in default.items():
if key not in result:
# Key doesn't exist in existing data, add it from default
result[key] = value
elif isinstance(result[key], dict) and isinstance(value, dict):
# Both are dictionaries, recursively merge them
result[key] = self._deep_merge_dicts(value, result[key])
# If key exists in result but isn't a dict, keep the existing value (no change needed)
return result
def _reorder_components(self, existing_components: ComponentData) -> ComponentData:
"""
Reorder components according to the desired structure while preserving existing data.
Args:
existing_components: The existing components dictionary
Returns:
A new dictionary with components reordered according to the desired structure
"""
desired_component_order = [
"Flight Controller",
"Frame",
"Battery Monitor",
"Battery",
"ESC",
"Motors",
"Propellers",
"GNSS Receiver",
"RC Controller",
"RC Transmitter",
"RC Receiver",
"Telemetry",
]
# Create reordered components dict
reordered_components = {}
remaining_components = existing_components.copy()
# First, add components in the desired order
for component_name in desired_component_order:
if component_name in remaining_components:
reordered_components[component_name] = remaining_components.pop(component_name)
# Then add any remaining unknown components at the end
reordered_components.update(remaining_components)
# Second step: for each component, ensure Product fields are in correct order (Version before URL)
for component_name, component_data in reordered_components.items():
if "Product" in component_data and isinstance(component_data["Product"], dict):
product = component_data["Product"]
if "Version" in product and "URL" in product:
# Create new ordered product dict
ordered_product = {}
# Add fields in desired order
for field in ["Manufacturer", "Model", "Version", "URL"]:
if field in product:
ordered_product[field] = product[field]
# Add any remaining fields
for field, value in product.items():
if field not in ordered_product:
ordered_product[field] = value
reordered_components[component_name]["Product"] = ordered_product
return reordered_components
def init_battery_chemistry(self) -> None:
self._battery_chemistry = (
self._data.get("Components", {}).get("Battery", {}).get("Specifications", {}).get("Chemistry", "")
)
if self._battery_chemistry not in BatteryCell.chemistries():
logging_error(_("Invalid battery chemistry %s, defaulting to Lipo"), self._battery_chemistry)
self._battery_chemistry = "Lipo"
def init_possible_choices(self, doc_dict: dict[str, Any]) -> None:
"""Initialize possible choices for validation rules."""
# this method should be implemented in the ComponentDataModelValidation mixedin
def correct_display_values_in_loaded_data(self) -> None:
"""Correct display values stored in JSON during model initialization."""
# this method should be implemented in the ComponentDataModelValidation mixedin
def get_combobox_values_for_path(self, path: ComponentPath) -> tuple[str, ...]:
"""Get valid combobox values for a given path."""
return self._possible_choices.get(path, ())
def set_configuration_template(self, vehicle_template_name: str) -> None:
"""Set the vehicle configuration template name in the data."""
self._data["Configuration template"] = vehicle_template_name
self._is_new_project = True
def is_new_project(self) -> bool:
"""Check if the project is new."""
return self._is_new_project