forked from ArduPilot/MethodicConfigurator
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbackend_filesystem_configuration_steps.py
More file actions
327 lines (287 loc) · 15.6 KB
/
backend_filesystem_configuration_steps.py
File metadata and controls
327 lines (287 loc) · 15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
"""
Manages configuration steps at the filesystem level.
This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator
SPDX-FileCopyrightText: 2024-2026 Amilcar do Carmo Lucas <amilcar.lucas@iav.de>
SPDX-License-Identifier: GPL-3.0-or-later
"""
from json import JSONDecodeError
from json import load as json_load
from logging import error as logging_error
from logging import info as logging_info
from logging import warning as logging_warning
from os import path as os_path
from typing import Optional, TypedDict
# from sys import exit as sys_exit
# from logging import debug as logging_debug
from jsonschema import validate as json_validate
from jsonschema.exceptions import ValidationError
from ardupilot_methodic_configurator import _
from ardupilot_methodic_configurator.data_model_par_dict import Par, ParDict
class PhaseData(TypedDict, total=False):
"""
Type definition for configuration phase data.
Attributes:
start: The starting file number for this phase
end: The ending file number for this phase (computed)
weight: The weight for UI layout proportions (computed)
description: Human-readable description of the phase
optional: Whether this phase is optional
"""
start: int
end: int
weight: int
description: str
optional: bool
class ConfigurationSteps:
"""
A class to manage configuration steps for the ArduPilot methodic configurator.
This class provides methods for reading and validating configuration steps, including forced and derived parameters.
It is designed to simplify the interaction with configuration steps for managing ArduPilot configuration files.
Attributes:
configuration_steps_filename (str): The name of the file containing documentation for the configuration files.
configuration_steps (dict): A dictionary containing the configuration steps.
"""
def __init__(self, _vehicle_dir: str, vehicle_type: str) -> None:
self.configuration_steps_filename = "configuration_steps_" + vehicle_type + ".json"
self.configuration_steps: dict[str, dict] = {}
self.configuration_phases: dict[str, PhaseData] = {}
self.forced_parameters: dict[str, ParDict] = {}
self.derived_parameters: dict[str, ParDict] = {}
self.log_loaded_file = False
def re_init(self, vehicle_dir: str, vehicle_type: str) -> None: # pylint: disable=too-many-branches
if vehicle_type == "":
return
self.configuration_steps_filename = "configuration_steps_" + vehicle_type + ".json"
# Define a list of directories to search for the configuration_steps_filename file
search_directories = [vehicle_dir, os_path.dirname(os_path.abspath(__file__))]
file_found = False
json_content = {}
for i, directory in enumerate(search_directories):
try:
with open(os_path.join(directory, self.configuration_steps_filename), encoding="utf-8-sig") as file:
json_content = json_load(file)
file_found = True
if self.log_loaded_file:
if i == 0:
logging_warning(
_("Configuration steps '%s' loaded from %s (overwriting default configuration steps)."),
self.configuration_steps_filename,
directory,
)
if i == 1:
logging_info(
_("Configuration steps '%s' loaded from %s."), self.configuration_steps_filename, directory
)
break
except FileNotFoundError:
pass
except JSONDecodeError as e:
logging_error(_("Error in file '%s': %s"), self.configuration_steps_filename, e)
break
# Validate the vehicle configuration steps file against the configuration_steps_schema.json schema
if file_found:
schema_file = os_path.join(os_path.dirname(os_path.abspath(__file__)), "configuration_steps_schema.json")
try:
with open(schema_file, encoding="utf-8") as schema:
schema_data = json_load(schema)
json_validate(instance=json_content, schema=schema_data)
except FileNotFoundError:
logging_error(_("Schema file '%s' not found"), schema_file)
except ValidationError as e:
logging_error(_("Configuration steps validation error: %s"), str(e))
except JSONDecodeError as e:
logging_error(_("Error in schema file '%s': %s"), schema_file, e)
if file_found and "steps" in json_content:
self.configuration_steps = json_content["steps"]
for filename, file_info in self.configuration_steps.items():
self.__validate_parameters_in_configuration_steps(filename, file_info, "forced")
self.__validate_parameters_in_configuration_steps(filename, file_info, "derived")
else:
logging_warning(_("No configuration steps documentation and no forced and derived parameters will be available."))
if file_found and "phases" in json_content:
self.configuration_phases = json_content["phases"]
else:
logging_warning(_("No configuration phases documentation will be available."))
self.log_loaded_file = True
def __validate_parameters_in_configuration_steps(self, filename: str, file_info: dict, parameter_type: str) -> None:
"""
Validates the parameters in the configuration steps.
This method checks if the parameters in the configuration steps are correctly formatted.
If a parameter is missing the 'New Value' or 'Change Reason' attribute, an error message is logged.
"""
if parameter_type + "_parameters" in file_info:
if not isinstance(file_info[parameter_type + "_parameters"], dict):
logging_error(
_("Error in file '%s': '%s' %s parameter is not a dictionary"),
self.configuration_steps_filename,
filename,
parameter_type,
)
return
for parameter, parameter_info in file_info[parameter_type + "_parameters"].items():
if "New Value" not in parameter_info:
logging_error(
_("Error in file '%s': '%s' %s parameter '%s' 'New Value' attribute not found."),
self.configuration_steps_filename,
filename,
parameter_type,
parameter,
)
if "Change Reason" not in parameter_info:
logging_error(
_("Error in file '%s': '%s' %s parameter '%s' 'Change Reason' attribute not found."),
self.configuration_steps_filename,
filename,
parameter_type,
parameter,
)
def compute_parameters( # pylint: disable=too-many-branches, too-many-arguments, too-many-positional-arguments
self,
filename: str,
file_info: dict,
parameter_type: str,
variables: dict,
ignore_fc_derived_param_warnings: bool = False,
) -> str:
"""
Computes the forced or derived parameters for a given configuration file.
If the parameter is forced, it is added to the forced_parameters dictionary.
If the parameter is derived, it is added to the derived_parameters dictionary.
"""
if parameter_type + "_parameters" not in file_info or not variables:
return ""
destination = self.forced_parameters if parameter_type == "forced" else self.derived_parameters
for parameter, parameter_info in file_info[parameter_type + "_parameters"].items():
try:
if ("fc_parameters" in str(parameter_info["New Value"])) and (
"fc_parameters" not in variables or variables["fc_parameters"] == {}
):
error_msg = _(
"In file '{self.configuration_steps_filename}': '{filename}' {parameter_type} "
"parameter '{parameter}' could not be computed: 'fc_parameters' not found, is an FC connected?"
)
error_msg = error_msg.format(**locals())
if parameter_type == "forced":
logging_error(error_msg)
return error_msg
if not ignore_fc_derived_param_warnings:
logging_warning(error_msg)
continue
try:
result = eval(str(parameter_info["New Value"]), {}, variables) # noqa: S307 pylint: disable=eval-used
except (ZeroDivisionError, ValueError):
# Handle math errors like:
# - ZeroDivisionError: division by zero or 0.0 raised to negative power
# - ValueError: math domain error (e.g., Diameter_inches**-0.838 when Diameter_inches is 0)
error_msg = _(
"In file '{self.configuration_steps_filename}': '{filename}' {parameter_type} "
"parameter '{parameter}' evaluation resulted in math error: {math_error}"
)
error_msg = error_msg.format(**locals())
if parameter_type == "forced":
logging_error(error_msg)
return error_msg
if not ignore_fc_derived_param_warnings:
logging_warning(error_msg)
continue
# convert (combobox) string text to (parameter value) string int or float
if isinstance(result, str):
if parameter in variables["doc_dict"]:
values = variables["doc_dict"][parameter]["values"]
if values:
result = next(key for key, value in values.items() if value == result)
else:
bitmasks = variables["doc_dict"][parameter]["Bitmask"]
if bitmasks:
result = 2 ** next(key for key, bitmask in bitmasks.items() if bitmask == result)
else:
error_msg = _(
"In file '{self.configuration_steps_filename}': '{filename}' {parameter_type} "
"parameter '{parameter}' could not be computed, no documentation metadata available for it"
)
error_msg = error_msg.format(**locals())
if parameter_type == "forced":
logging_error(error_msg)
return error_msg
logging_warning(error_msg)
continue
if filename not in destination:
destination[filename] = ParDict()
change_reason = _(parameter_info["Change Reason"]) if parameter_info["Change Reason"] else ""
destination[filename][parameter] = Par(float(result), change_reason)
except (SyntaxError, NameError, KeyError, StopIteration) as _e:
error_msg = _(
"In file '{self.configuration_steps_filename}': '{filename}' {parameter_type} "
"parameter '{parameter}' could not be computed: {_e}"
)
error_msg = error_msg.format(**locals())
if parameter_type == "forced":
logging_error(error_msg)
return error_msg
logging_warning(error_msg)
return ""
def auto_changed_by(self, selected_file: str) -> str:
if selected_file in self.configuration_steps:
return str(self.configuration_steps[selected_file].get("auto_changed_by", ""))
return ""
def jump_possible(self, selected_file: str) -> dict[str, str]:
if selected_file in self.configuration_steps:
return dict(self.configuration_steps[selected_file].get("jump_possible", {}))
return {}
def get_documentation_text_and_url(self, selected_file: str, prefix_key: str) -> tuple[str, str]:
documentation = self.configuration_steps.get(selected_file, {}) if self.configuration_steps else None
if documentation is None:
text = _(
"File '{self.configuration_steps_filename}' not found. No intermediate parameter configuration steps available"
)
text = text.format(**locals())
url = ""
else:
text = _("No documentation available for {selected_file} in the {self.configuration_steps_filename} file")
text = documentation.get(prefix_key + "_text", text.format(**locals()))
url = documentation.get(prefix_key + "_url", "")
return text, url
def get_seq_tooltip_text(self, selected_file: str, tooltip_key: str) -> str:
documentation = self.configuration_steps.get(selected_file, {}) if self.configuration_steps else None
if documentation is None:
text = _(
"File '{self.configuration_steps_filename}' not found. No intermediate parameter configuration steps available"
)
text = text.format(**locals())
else:
text = _("No documentation available for {selected_file} in the {self.configuration_steps_filename} file")
text = documentation.get(tooltip_key, text.format(**locals()))
return text
def get_sorted_phases_with_end_and_weight(self, total_files: int) -> dict[str, PhaseData]:
"""
Get sorted phases with added 'end' and 'weight' information.
Returns phases sorted by start position, with each phase containing:
- 'end': The end file number (start of next phase or total_files)
- 'weight': Weight for UI layout (max(2, end - start))
"""
active_phases = {k: v for k, v in self.configuration_phases.items() if "start" in v}
# Sort phases by start position
sorted_phases: dict[str, PhaseData] = dict(sorted(active_phases.items(), key=lambda x: x[1].get("start", 0)))
# Add the end information to each phase using the start of the next phase
phase_names = list(sorted_phases.keys())
for i, phase_name in enumerate(phase_names):
if i < len(phase_names) - 1:
next_phase_name = phase_names[i + 1]
sorted_phases[phase_name]["end"] = sorted_phases[next_phase_name].get("start", total_files)
else:
sorted_phases[phase_name]["end"] = total_files
phase_start = sorted_phases[phase_name].get("start", 0)
phase_end = sorted_phases[phase_name].get("end", total_files)
sorted_phases[phase_name]["weight"] = max(2, phase_end - phase_start)
return sorted_phases
def get_plugin(self, selected_file: str) -> Optional[dict]:
"""
Get the plugin configuration for the selected file.
Args:
selected_file: The filename to get plugin info for
Returns:
The plugin dict with 'name' and 'placement' if exists, None otherwise
"""
if selected_file in self.configuration_steps:
return self.configuration_steps[selected_file].get("plugin")
return None