|
| 1 | +from typing import Annotated, Literal |
| 2 | + |
| 3 | +from pydantic import ( |
| 4 | + BaseModel, |
| 5 | + ConfigDict, |
| 6 | + Field, |
| 7 | + NonNegativeInt, |
| 8 | + RootModel, |
| 9 | + Strict, |
| 10 | + StrictBool, |
| 11 | + StrictFloat, |
| 12 | + StrictInt, |
| 13 | +) |
| 14 | + |
| 15 | +from cogip.protobuf import ( |
| 16 | + PB_ParameterGetRequest, |
| 17 | + PB_ParameterGetResponse, |
| 18 | + PB_ParameterSetRequest, |
| 19 | + PB_ParameterSetResponse, |
| 20 | + PB_ParameterStatus, |
| 21 | +) |
| 22 | + |
| 23 | + |
| 24 | +class FirmwareParameterValidationFailed(Exception): |
| 25 | + """Exception raised when firmware parameter validation fails. |
| 26 | +
|
| 27 | + This exception is raised when a firmware parameter value does not meet |
| 28 | + the validation constraints defined for that parameter value range on the embedded side. |
| 29 | + """ |
| 30 | + |
| 31 | + pass |
| 32 | + |
| 33 | + |
| 34 | +class FirmwareParameterNotFound(Exception): |
| 35 | + """Exception raised when a requested firmware parameter is not found. |
| 36 | +
|
| 37 | + This exception is raised when trying to access or modify a firmware parameter |
| 38 | + that does not exist in the parameter registry on the embedded side. |
| 39 | + """ |
| 40 | + |
| 41 | + pass |
| 42 | + |
| 43 | + |
| 44 | +def fnv1a_hash(string: str) -> int: |
| 45 | + """Compute FNV-1a hash of a string. |
| 46 | +
|
| 47 | + Args: |
| 48 | + string: The string to hash |
| 49 | +
|
| 50 | + Returns: |
| 51 | + The 32-bit hash value as an unsigned integer |
| 52 | +
|
| 53 | + Example: |
| 54 | + >>> hex(fnv1a_hash("parameter")) |
| 55 | + '0x100b' |
| 56 | + """ |
| 57 | + # FNV-1a constants |
| 58 | + FNV_OFFSET_BASIS = 0x811C9DC5 |
| 59 | + FNV_PRIME = 0x01000193 |
| 60 | + |
| 61 | + hash_value = FNV_OFFSET_BASIS |
| 62 | + |
| 63 | + for byte in string.encode("utf-8"): |
| 64 | + hash_value ^= byte |
| 65 | + hash_value = (hash_value * FNV_PRIME) & 0xFFFFFFFF # Keep it 32-bit |
| 66 | + |
| 67 | + return hash_value |
| 68 | + |
| 69 | + |
| 70 | +class FirmwareParameterBase(BaseModel): |
| 71 | + """Base firmware parameter type""" |
| 72 | + |
| 73 | + model_config = ConfigDict(validate_assignment=True) |
| 74 | + |
| 75 | + |
| 76 | +class FirmwareParameterFloat(FirmwareParameterBase): |
| 77 | + """Float firmware parameter value.""" |
| 78 | + |
| 79 | + type: Literal["float"] = "float" |
| 80 | + content: StrictFloat |
| 81 | + |
| 82 | + |
| 83 | +class FirmwareParameterDouble(FirmwareParameterBase): |
| 84 | + """Double firmware parameter value.""" |
| 85 | + |
| 86 | + type: Literal["double"] = "double" |
| 87 | + content: StrictFloat |
| 88 | + |
| 89 | + |
| 90 | +class FirmwareParameterInt32(FirmwareParameterBase): |
| 91 | + """Signed 32-bit integer firmware parameter value.""" |
| 92 | + |
| 93 | + type: Literal["int32"] = "int32" |
| 94 | + content: StrictInt |
| 95 | + |
| 96 | + |
| 97 | +class FirmwareParameterUInt32(FirmwareParameterBase): |
| 98 | + """Unsigned 32-bit integer firmware parameter value.""" |
| 99 | + |
| 100 | + type: Literal["uint32"] = "uint32" |
| 101 | + content: Annotated[NonNegativeInt, Strict()] |
| 102 | + |
| 103 | + |
| 104 | +class FirmwareParameterInt64(FirmwareParameterBase): |
| 105 | + """Signed 64-bit integer firmware parameter value.""" |
| 106 | + |
| 107 | + type: Literal["int64"] = "int64" |
| 108 | + content: StrictInt |
| 109 | + |
| 110 | + |
| 111 | +class FirmwareParameterUInt64(FirmwareParameterBase): |
| 112 | + """Unsigned 64-bit integer firmware parameter value.""" |
| 113 | + |
| 114 | + type: Literal["uint64"] = "uint64" |
| 115 | + content: Annotated[NonNegativeInt, Strict()] |
| 116 | + |
| 117 | + |
| 118 | +class FirmwareParameterBool(FirmwareParameterBase): |
| 119 | + """Boolean firmware parameter value.""" |
| 120 | + |
| 121 | + type: Literal["bool"] = "bool" |
| 122 | + content: StrictBool |
| 123 | + |
| 124 | + |
| 125 | +# Discriminated union of all firmware parameter value types |
| 126 | +FirmwareParameterValueType = ( |
| 127 | + FirmwareParameterFloat |
| 128 | + | FirmwareParameterDouble |
| 129 | + | FirmwareParameterInt32 |
| 130 | + | FirmwareParameterUInt32 |
| 131 | + | FirmwareParameterInt64 |
| 132 | + | FirmwareParameterUInt64 |
| 133 | + | FirmwareParameterBool |
| 134 | +) |
| 135 | + |
| 136 | + |
| 137 | +class FirmwareParameter(BaseModel): |
| 138 | + """Firmware parameter model with discriminated union for type-safe values. |
| 139 | +
|
| 140 | + Attributes: |
| 141 | + name: The firmware parameter name |
| 142 | + value: The firmware parameter content value (float, int, or bool) |
| 143 | + value_obj: The firmware parameter value object with its type |
| 144 | + """ |
| 145 | + |
| 146 | + model_config = ConfigDict(validate_assignment=True) |
| 147 | + |
| 148 | + name: str |
| 149 | + value_obj: Annotated[FirmwareParameterValueType, Field(alias="value", discriminator="type")] |
| 150 | + |
| 151 | + def __hash__(self): |
| 152 | + return fnv1a_hash(self.name) |
| 153 | + |
| 154 | + @property |
| 155 | + def value(self) -> float | int | bool: |
| 156 | + """Get the firmware parameter content value. |
| 157 | +
|
| 158 | + Returns: |
| 159 | + The actual content value (float, int, or bool) |
| 160 | + """ |
| 161 | + return self.value_obj.content |
| 162 | + |
| 163 | + @value.setter |
| 164 | + def value(self, content: float | int | bool) -> None: |
| 165 | + """Set the firmware parameter content value. |
| 166 | +
|
| 167 | + Args: |
| 168 | + content: The new content value to set |
| 169 | +
|
| 170 | + Note: |
| 171 | + The type of the firmware parameter remains unchanged. The content must be |
| 172 | + compatible with the existing parameter type. |
| 173 | + """ |
| 174 | + self.value_obj.content = content |
| 175 | + |
| 176 | + def pb_copy(self, message: PB_ParameterSetRequest | PB_ParameterGetRequest) -> None: |
| 177 | + """Copy values to Protobuf message""" |
| 178 | + message.key_hash = hash(self) |
| 179 | + |
| 180 | + if isinstance(message, PB_ParameterSetRequest): |
| 181 | + setattr(message.value, f"{self.value_obj.type}_value", self.value_obj.content) |
| 182 | + |
| 183 | + def pb_read(self, message: PB_ParameterSetResponse | PB_ParameterGetResponse) -> None: |
| 184 | + """Read values from Protobuf message and update firmware parameter content. |
| 185 | +
|
| 186 | + Args: |
| 187 | + message: The ParameterSetResponse or ParameterGetResponse containing the value to read |
| 188 | +
|
| 189 | + Raises: |
| 190 | + ValueError: If the key_hash doesn't match the firmware parameter name or no value set |
| 191 | + FirmwareParameterValidationFailed: If firmware parameter validation failed on the embedded side |
| 192 | + FirmwareParameterReadOnly: If firmware parameter is read-only on the embedded side |
| 193 | + FirmwareParameterNotFound: If firmware parameter not found on the embedded side |
| 194 | + """ |
| 195 | + |
| 196 | + # Verify that the name matches |
| 197 | + if message.key_hash != hash(self): |
| 198 | + raise ValueError(f"Key hash mismatch: expected '{hash(self)}', got '{message.key_hash}'") |
| 199 | + |
| 200 | + if isinstance(message, PB_ParameterSetResponse): |
| 201 | + # Check status and raise appropriate exceptions |
| 202 | + match message.status: |
| 203 | + case PB_ParameterStatus.VALIDATION_FAILED: |
| 204 | + raise FirmwareParameterValidationFailed(f"Firmware parameter '{self.name}' validation failed") |
| 205 | + case PB_ParameterStatus.NOT_FOUND: |
| 206 | + raise FirmwareParameterNotFound(f"Firmware parameter '{self.name}' not found in registry") |
| 207 | + case PB_ParameterStatus.SUCCESS: |
| 208 | + pass # Operation succeeded, nothing to do |
| 209 | + elif isinstance(message, PB_ParameterGetResponse): |
| 210 | + # Get the name of the field defined in the oneof |
| 211 | + which_field = message.value.WhichOneof("value") |
| 212 | + |
| 213 | + if which_field is None: |
| 214 | + raise ValueError("No value set in ParameterGetResponse, firmware parameter not found") |
| 215 | + |
| 216 | + # Get the value of the active field |
| 217 | + content = getattr(message.value, which_field) |
| 218 | + |
| 219 | + # Update the firmware parameter content |
| 220 | + self.value_obj.content = content |
| 221 | + |
| 222 | + |
| 223 | +class FirmwareParametersGroup(RootModel): |
| 224 | + """Container for a group of firmware parameters with name-based access. |
| 225 | +
|
| 226 | + This class manages a collection of firmware parameters and provides convenient |
| 227 | + get/set methods using firmware parameter names. |
| 228 | +
|
| 229 | + The model directly represents a list of FirmwareParameter objects. |
| 230 | + """ |
| 231 | + |
| 232 | + root: Annotated[list[FirmwareParameter], Field(default_factory=list)] |
| 233 | + |
| 234 | + def model_post_init(self, __context) -> None: |
| 235 | + """Build index after initialization.""" |
| 236 | + super().model_post_init(__context) |
| 237 | + self._rebuild_index() |
| 238 | + |
| 239 | + def _rebuild_index(self) -> None: |
| 240 | + """Rebuild the internal index for firmware parameter name lookup.""" |
| 241 | + self._index: dict[str, int] = {param.name: idx for idx, param in enumerate(self.root)} |
| 242 | + |
| 243 | + def get(self, name: str) -> FirmwareParameter: |
| 244 | + """Get a firmware parameter by its name. |
| 245 | +
|
| 246 | + Args: |
| 247 | + name: The firmware parameter name |
| 248 | +
|
| 249 | + Returns: |
| 250 | + The FirmwareParameter object |
| 251 | +
|
| 252 | + Raises: |
| 253 | + KeyError: If the firmware parameter name is not found |
| 254 | + """ |
| 255 | + if name not in self._index: |
| 256 | + raise KeyError(f"Firmware parameter '{name}' not found") |
| 257 | + return self.root[self._index[name]] |
| 258 | + |
| 259 | + def __contains__(self, name: str) -> bool: |
| 260 | + """Check if a firmware parameter name exists in the list. |
| 261 | +
|
| 262 | + Args: |
| 263 | + name: The firmware parameter name to check |
| 264 | +
|
| 265 | + Returns: |
| 266 | + True if the firmware parameter exists, False otherwise |
| 267 | + """ |
| 268 | + return name in self._index |
| 269 | + |
| 270 | + def __getitem__(self, name: str) -> float | int | bool: |
| 271 | + """Get a firmware parameter's value using bracket notation. |
| 272 | +
|
| 273 | + Args: |
| 274 | + name: The firmware parameter name |
| 275 | +
|
| 276 | + Returns: |
| 277 | + The firmware parameter's content value |
| 278 | +
|
| 279 | + Raises: |
| 280 | + KeyError: If the firmware parameter name is not found |
| 281 | + """ |
| 282 | + return self.get(name).value |
| 283 | + |
| 284 | + def __setitem__(self, name: str, value: float | int | bool) -> None: |
| 285 | + """Set a firmware parameter's value using bracket notation. |
| 286 | +
|
| 287 | + Args: |
| 288 | + name: The firmware parameter name |
| 289 | + value: The new content value |
| 290 | +
|
| 291 | + Raises: |
| 292 | + KeyError: If the firmware parameter name is not found |
| 293 | + """ |
| 294 | + self.get(name).value = value |
| 295 | + |
| 296 | + def __len__(self) -> int: |
| 297 | + """Return the number of firmware parameters in the list.""" |
| 298 | + return len(self.root) |
| 299 | + |
| 300 | + def __iter__(self): |
| 301 | + """Iterate over all firmware parameters.""" |
| 302 | + return iter(self.root) |
0 commit comments