Skip to content

Commit 388216c

Browse files
committed
refactor(param editor): Move business logic into data_model_configuration_step.py
1 parent 83dd4be commit 388216c

File tree

5 files changed

+1079
-317
lines changed

5 files changed

+1079
-317
lines changed

ardupilot_methodic_configurator/data_model_ardupilot_parameter.py

Lines changed: 0 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -464,90 +464,3 @@ def get_value_from_keys(checked_keys: set[int]) -> str:
464464
465465
"""
466466
return str(sum(1 << key for key in checked_keys))
467-
468-
469-
class ConnectionRenamer:
470-
"""
471-
Utility class to handle renaming parameters based on connection prefixes.
472-
473-
This captures the logic for renaming parameters when connection types change,
474-
particularly handling special cases like CAN bus connections.
475-
"""
476-
477-
@staticmethod
478-
def generate_renames(parameters: list[str], new_connection_prefix: str) -> dict[str, str]:
479-
"""
480-
Generate a dictionary of parameter renames based on a new connection prefix.
481-
482-
Args:
483-
parameters: List of parameter names to potentially rename
484-
new_connection_prefix: The new prefix to apply (like "CAN2")
485-
486-
Returns:
487-
Dictionary mapping old parameter names to new parameter names
488-
489-
"""
490-
renames: dict[str, str] = {}
491-
492-
# Extract the type and number from the new connection prefix
493-
if len(new_connection_prefix) < 2:
494-
return renames
495-
496-
new_type = new_connection_prefix[:-1] # e.g., "CAN" from "CAN2"
497-
new_number = new_connection_prefix[-1] # e.g., "2" from "CAN2"
498-
499-
for param_name in parameters:
500-
new_prefix = new_connection_prefix
501-
old_prefix = param_name.split("_")[0]
502-
if new_type == "CAN" and "CAN_P" in param_name:
503-
old_prefix = param_name.split("_")[0] + "_" + param_name.split("_")[1]
504-
new_prefix = "CAN_P" + new_number
505-
if new_type == "CAN" and "CAN_D" in param_name:
506-
old_prefix = param_name.split("_")[0] + "_" + param_name.split("_")[1]
507-
new_prefix = "CAN_D" + new_number
508-
509-
if new_type in old_prefix:
510-
renames[param_name] = param_name.replace(old_prefix, new_prefix)
511-
512-
return renames
513-
514-
@staticmethod
515-
def apply_renames(
516-
parameters: dict[str, Any], new_connection_prefix: str, variables: Optional[dict[str, Any]] = None
517-
) -> tuple[set[str], list[tuple[str, str]]]:
518-
"""
519-
Apply connection prefix renames to a parameter dictionary.
520-
521-
Args:
522-
parameters: Dictionary of parameter objects to rename, it will modify it
523-
new_connection_prefix: The new prefix to apply
524-
variables: Optional dictionary of variables for evaluation
525-
526-
Returns:
527-
Tuple containing:
528-
- Set of duplicated parameter names that got removed
529-
- List of (old_name, new_name) pairs that were renamed
530-
531-
"""
532-
if variables:
533-
# If variables provided, evaluate the new_connection_prefix
534-
new_connection_prefix = eval(str(new_connection_prefix), {}, variables) # noqa: S307 pylint: disable=eval-used
535-
536-
# Generate the rename mapping
537-
renames = ConnectionRenamer.generate_renames(list(parameters.keys()), new_connection_prefix)
538-
539-
# Track unique new names and actual renames performed
540-
new_names = set()
541-
duplicates = set()
542-
renamed_pairs = []
543-
for old_name, new_name in renames.items():
544-
if new_name in new_names:
545-
parameters.pop(old_name)
546-
duplicates.add(old_name)
547-
else:
548-
new_names.add(new_name)
549-
if new_name != old_name:
550-
parameters[new_name] = parameters.pop(old_name)
551-
renamed_pairs.append((old_name, new_name))
552-
553-
return duplicates, renamed_pairs
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
"""
2+
Configuration step data model for parameter processing and domain model creation.
3+
4+
This file contains business logic for processing configuration steps, including parameter
5+
computation, domain model creation, and connection renaming operations.
6+
7+
This file is part of Ardupilot methodic configurator. https://github.com/ArduPilot/MethodicConfigurator
8+
9+
SPDX-FileCopyrightText: 2024-2025 Amilcar do Carmo Lucas <[email protected]>
10+
11+
SPDX-License-Identifier: GPL-3.0-or-later
12+
"""
13+
14+
from logging import info as logging_info
15+
from typing import Any, Optional
16+
17+
from ardupilot_methodic_configurator import _
18+
from ardupilot_methodic_configurator.backend_filesystem import LocalFilesystem
19+
from ardupilot_methodic_configurator.data_model_ardupilot_parameter import ArduPilotParameter
20+
21+
22+
class ConfigurationStepProcessor:
23+
"""
24+
Handles configuration step processing operations.
25+
26+
This class encapsulates the business logic for processing configuration steps,
27+
including parameter computation, domain model creation, and connection renaming.
28+
"""
29+
30+
def __init__(self, local_filesystem: LocalFilesystem) -> None:
31+
"""
32+
Initialize the configuration step processor.
33+
34+
Args:
35+
local_filesystem: The local filesystem instance to work with
36+
37+
"""
38+
self.local_filesystem = local_filesystem
39+
40+
def process_configuration_step(
41+
self,
42+
selected_file: str,
43+
fc_parameters: dict[str, float],
44+
variables: dict,
45+
) -> tuple[dict[str, ArduPilotParameter], bool, list[tuple[str, str]], list[tuple[str, str]]]:
46+
"""
47+
Process a configuration step including parameter computation and domain model creation.
48+
49+
Args:
50+
selected_file: The name of the selected parameter file
51+
fc_parameters: Dictionary of flight controller parameters
52+
variables: Variables dictionary for evaluation
53+
54+
Returns:
55+
Tuple containing:
56+
- Dictionary of ArduPilotParameter domain model objects
57+
- Boolean indicating if at least one parameter was edited
58+
- List of (title, message) tuples for UI error feedback
59+
- List of (title, message) tuples for UI info feedback
60+
61+
"""
62+
at_least_one_param_edited = False
63+
ui_errors: list[tuple[str, str]] = []
64+
ui_infos: list[tuple[str, str]] = []
65+
66+
# Process configuration step operations if configuration steps exist
67+
if self.local_filesystem.configuration_steps and selected_file in self.local_filesystem.configuration_steps:
68+
variables["fc_parameters"] = fc_parameters
69+
70+
# Compute derived parameters
71+
error_msg = self.local_filesystem.compute_parameters(
72+
selected_file, self.local_filesystem.configuration_steps[selected_file], "derived", variables
73+
)
74+
if error_msg:
75+
ui_errors.append((_("Error in derived parameters"), error_msg))
76+
# Merge derived parameter values
77+
elif self.local_filesystem.merge_forced_or_derived_parameters(
78+
selected_file, self.local_filesystem.derived_parameters, list(fc_parameters.keys())
79+
):
80+
at_least_one_param_edited = True
81+
82+
# Handle connection renaming
83+
connection_edited, ui_infos = self._handle_connection_renaming(selected_file, variables)
84+
if connection_edited:
85+
at_least_one_param_edited = True
86+
87+
# Create domain model parameters
88+
parameters = self._create_domain_model_parameters(selected_file, fc_parameters)
89+
90+
return parameters, at_least_one_param_edited, ui_errors, ui_infos
91+
92+
def _handle_connection_renaming(self, selected_file: str, variables: dict) -> tuple[bool, list[tuple[str, str]]]:
93+
"""
94+
Handle connection renaming operations for the selected file.
95+
96+
Args:
97+
selected_file: The name of the selected parameter file
98+
variables: Variables dictionary for evaluation
99+
100+
Returns:
101+
Tuple containing:
102+
- True if parameters were modified, False otherwise
103+
- List of (title, message) tuples for UI info feedback
104+
105+
"""
106+
if "rename_connection" not in self.local_filesystem.configuration_steps[selected_file]:
107+
return False, []
108+
109+
new_connection_prefix = self.local_filesystem.configuration_steps[selected_file]["rename_connection"]
110+
111+
# Apply renames to the parameters dictionary
112+
duplicated_parameters, renamed_pairs = self._apply_connection_renames(
113+
self.local_filesystem.file_parameters[selected_file], new_connection_prefix, variables
114+
)
115+
116+
at_least_one_param_edited = False
117+
ui_infos: list[tuple[str, str]] = []
118+
119+
# Handle duplicated parameters
120+
for old_name in duplicated_parameters:
121+
logging_info(_("Removing duplicate parameter %s"), old_name)
122+
info_msg = _("The parameter '{old_name}' was removed due to duplication.")
123+
ui_infos.append((_("Parameter Removed"), info_msg.format(**locals())))
124+
at_least_one_param_edited = True
125+
126+
# Handle renamed parameters
127+
for old_name, new_name in renamed_pairs:
128+
logging_info(_("Renaming parameter %s to %s"), old_name, new_name)
129+
info_msg = _(
130+
"The parameter '{old_name}' was renamed to '{new_name}'.\n"
131+
"to obey the flight controller connection defined in the component editor window."
132+
)
133+
ui_infos.append((_("Parameter Renamed"), info_msg.format(**locals())))
134+
at_least_one_param_edited = True
135+
136+
return at_least_one_param_edited, ui_infos
137+
138+
def _create_domain_model_parameters(
139+
self, selected_file: str, fc_parameters: dict[str, float]
140+
) -> dict[str, ArduPilotParameter]:
141+
"""
142+
Create ArduPilotParameter domain model objects for each parameter in the file.
143+
144+
Args:
145+
selected_file: The name of the selected parameter file
146+
fc_parameters: Dictionary of flight controller parameters
147+
148+
Returns:
149+
Dictionary mapping parameter names to ArduPilotParameter objects
150+
151+
"""
152+
parameters: dict[str, ArduPilotParameter] = {}
153+
154+
for param_name, param in self.local_filesystem.file_parameters[selected_file].items():
155+
# Get parameter metadata and default values
156+
metadata = self.local_filesystem.doc_dict.get(param_name, {})
157+
default_par = self.local_filesystem.param_default_dict.get(param_name, None)
158+
159+
# Check if parameter is forced or derived
160+
forced_par = self.local_filesystem.forced_parameters.get(selected_file, {}).get(param_name, None)
161+
derived_par = self.local_filesystem.derived_parameters.get(selected_file, {}).get(param_name, None)
162+
163+
# Get FC value if available
164+
fc_value = fc_parameters.get(param_name)
165+
166+
# Create domain model parameter
167+
parameters[param_name] = ArduPilotParameter(
168+
param_name, param, metadata, default_par, fc_value, forced_par, derived_par
169+
)
170+
171+
return parameters
172+
173+
def filter_different_parameters(self, parameters: dict[str, ArduPilotParameter]) -> dict[str, ArduPilotParameter]:
174+
"""
175+
Filter parameters to only include those that are different from FC values or missing from FC.
176+
177+
Args:
178+
parameters: Dictionary of all parameters
179+
180+
Returns:
181+
Dictionary of parameters that are different from FC
182+
183+
"""
184+
return {name: param for name, param in parameters.items() if param.is_different_from_fc or not param.has_fc_value}
185+
186+
@staticmethod
187+
def _generate_connection_renames(parameters: list[str], new_connection_prefix: str) -> dict[str, str]:
188+
"""
189+
Generate a dictionary of parameter renames based on a new connection prefix.
190+
191+
Args:
192+
parameters: List of parameter names to potentially rename
193+
new_connection_prefix: The new prefix to apply (like "CAN2")
194+
195+
Returns:
196+
Dictionary mapping old parameter names to new parameter names
197+
198+
"""
199+
renames: dict[str, str] = {}
200+
201+
# Extract the type and number from the new connection prefix
202+
if len(new_connection_prefix) < 2:
203+
return renames
204+
205+
new_type = new_connection_prefix[:-1] # e.g., "CAN" from "CAN2"
206+
new_number = new_connection_prefix[-1] # e.g., "2" from "CAN2"
207+
208+
for param_name in parameters:
209+
new_prefix = new_connection_prefix
210+
old_prefix = param_name.split("_")[0]
211+
if new_type == "CAN" and "CAN_P" in param_name:
212+
old_prefix = param_name.split("_")[0] + "_" + param_name.split("_")[1]
213+
new_prefix = "CAN_P" + new_number
214+
if new_type == "CAN" and "CAN_D" in param_name:
215+
old_prefix = param_name.split("_")[0] + "_" + param_name.split("_")[1]
216+
new_prefix = "CAN_D" + new_number
217+
218+
if new_type in old_prefix:
219+
renames[param_name] = param_name.replace(old_prefix, new_prefix)
220+
221+
return renames
222+
223+
@staticmethod
224+
def _apply_connection_renames(
225+
parameters: dict[str, Any], new_connection_prefix: str, variables: Optional[dict[str, Any]] = None
226+
) -> tuple[set[str], list[tuple[str, str]]]:
227+
"""
228+
Apply connection prefix renames to a parameter dictionary.
229+
230+
Args:
231+
parameters: Dictionary of parameter objects to rename, it will modify it
232+
new_connection_prefix: The new prefix to apply
233+
variables: Optional dictionary of variables for evaluation
234+
235+
Returns:
236+
Tuple containing:
237+
- Set of duplicated parameter names that got removed
238+
- List of (old_name, new_name) pairs that were renamed
239+
240+
"""
241+
if variables:
242+
# If variables provided, evaluate the new_connection_prefix
243+
new_connection_prefix = eval(str(new_connection_prefix), {}, variables) # noqa: S307 pylint: disable=eval-used
244+
245+
# Generate the rename mapping
246+
renames = ConfigurationStepProcessor._generate_connection_renames(list(parameters.keys()), new_connection_prefix)
247+
248+
# Track unique new names and actual renames performed
249+
new_names = set()
250+
duplicates = set()
251+
renamed_pairs = []
252+
for old_name, new_name in renames.items():
253+
if new_name in new_names:
254+
parameters.pop(old_name)
255+
duplicates.add(old_name)
256+
else:
257+
new_names.add(new_name)
258+
if new_name != old_name:
259+
parameters[new_name] = parameters.pop(old_name)
260+
renamed_pairs.append((old_name, new_name))
261+
262+
return duplicates, renamed_pairs

0 commit comments

Comments
 (0)