diff --git a/cogip/models/__init__.py b/cogip/models/__init__.py index db568682c..306ffac11 100644 --- a/cogip/models/__init__.py +++ b/cogip/models/__init__.py @@ -1,3 +1,14 @@ +from .firmware_parameter import ( # noqa + FirmwareParameter, + FirmwareParameterNotFound, + FirmwareParametersGroup, + FirmwareParameterValidationFailed, +) +from .firmware_telemetry import ( # noqa + TelemetryData, + TelemetryDict, + TelemetryValue, +) from .models import ( # noqa CameraExtrinsicParameters, DynObstacle, @@ -12,3 +23,9 @@ ShellMenu, Vertex, ) +from .odometry_calibration import ( # noqa + CalibrationResult, + CalibrationState, + EncoderDeltas, + OdometryParameters, +) diff --git a/cogip/models/firmware_parameter.py b/cogip/models/firmware_parameter.py index 2e09e9181..722802991 100644 --- a/cogip/models/firmware_parameter.py +++ b/cogip/models/firmware_parameter.py @@ -19,6 +19,7 @@ PB_ParameterSetResponse, PB_ParameterStatus, ) +from cogip.utils.fnv1a import fnv1a_hash class FirmwareParameterValidationFailed(Exception): @@ -41,32 +42,6 @@ class FirmwareParameterNotFound(Exception): pass -def fnv1a_hash(string: str) -> int: - """Compute FNV-1a hash of a string. - - Args: - string: The string to hash - - Returns: - The 32-bit hash value as an unsigned integer - - Example: - >>> hex(fnv1a_hash("parameter")) - '0x100b' - """ - # FNV-1a constants - FNV_OFFSET_BASIS = 0x811C9DC5 - FNV_PRIME = 0x01000193 - - hash_value = FNV_OFFSET_BASIS - - for byte in string.encode("utf-8"): - hash_value ^= byte - hash_value = (hash_value * FNV_PRIME) & 0xFFFFFFFF # Keep it 32-bit - - return hash_value - - class FirmwareParameterBase(BaseModel): """Base firmware parameter type""" diff --git a/cogip/models/firmware_telemetry.py b/cogip/models/firmware_telemetry.py new file mode 100644 index 000000000..afa3595fb --- /dev/null +++ b/cogip/models/firmware_telemetry.py @@ -0,0 +1,132 @@ +""" +Firmware Telemetry Models. + +This module provides models for parsing generic telemetry data received +from the robot's MCU firmware via Protobuf messages using FNV-1a key hashes. +""" + +from pydantic import BaseModel, ConfigDict + +from cogip.protobuf import PB_TelemetryData +from cogip.utils.fnv1a import fnv1a_hash + +# Discriminated union of all firmware telemetry value types +TelemetryValue = float | int + + +class TelemetryData(BaseModel): + """ + Generic telemetry data point with key hash, timestamp, and value. + + Attributes: + key_hash: FNV-1a hash of the telemetry key. + timestamp_ms: Timestamp in milliseconds when the data was captured. + value: The telemetry value (float or int depending on type). + """ + + model_config = ConfigDict(validate_assignment=True) + + key_hash: int + timestamp_ms: int + value: TelemetryValue + + @classmethod + def from_protobuf(cls, message: PB_TelemetryData) -> "TelemetryData": + """ + Parse a TelemetryData from a PB_TelemetryData protobuf message. + + Args: + message: The PB_TelemetryData protobuf message. + + Returns: + TelemetryData instance with parsed values. + """ + key_hash = message.key_hash + timestamp_ms = message.timestamp_ms + + # Extract the value from the oneof field + which_value = message.WhichOneof("value") + if which_value is None: + value: TelemetryValue = 0 + else: + value = getattr(message, which_value) + + return cls(key_hash=key_hash, timestamp_ms=timestamp_ms, value=value) + + +class TelemetryDict: + """ + Dict-like store for telemetry data points indexed by key hash. + + This class collects telemetry data and provides access by key name. + """ + + def __init__(self): + self._data: dict[int, TelemetryData] = {} + + def update(self, data: TelemetryData) -> None: + """ + Update the store with a new telemetry data point. + + Args: + data: The telemetry data point to store. + """ + self._data[data.key_hash] = data + + def get_model(self, key: str) -> TelemetryData: + """ + Get telemetry data model by key name. + + Args: + key: The telemetry key name to look up. + + Returns: + TelemetryData for the key. + + Raises: + KeyError: If the key is not found in the store. + """ + return self._data[fnv1a_hash(key)] + + def __getitem__(self, key: str) -> TelemetryValue: + """ + Get telemetry value by key name. Raises KeyError if not found. + + Args: + key: The telemetry key name to look up. + + Returns: + The telemetry value for the key. + + Raises: + KeyError: If the key is not found in the store. + """ + return self._data[fnv1a_hash(key)].value + + def __contains__(self, key: str) -> bool: + """ + Check if a key exists in the store. + + Args: + key: The telemetry key name to check. + + Returns: + True if the key exists, False otherwise. + """ + return fnv1a_hash(key) in self._data + + def __len__(self) -> int: + """Return the number of telemetry entries.""" + return len(self._data) + + def __bool__(self) -> bool: + """Return True if the store contains any data.""" + return bool(self._data) + + def items(self): + """Iterate over (key_hash, TelemetryData) tuples.""" + return self._data.items() + + def values(self): + """Iterate over TelemetryData values.""" + return self._data.values() diff --git a/cogip/models/odometry_calibration.py b/cogip/models/odometry_calibration.py new file mode 100644 index 000000000..3155c9d0a --- /dev/null +++ b/cogip/models/odometry_calibration.py @@ -0,0 +1,49 @@ +""" +Odometry Calibration Models + +Pydantic models for odometry calibration data. +""" + +from pydantic import BaseModel + + +class CalibrationResult(BaseModel): + """Result of a calibration computation.""" + + wheels_distance: float + right_wheel_radius: float + left_wheel_radius: float + + +class CalibrationState(BaseModel): + """ + State container for calibration intermediate values. + + Tracks alpha and beta coefficients computed during calibration phases. + """ + + alpha_l: float = 0.0 + alpha_r: float = 0.0 + beta: float = 0.0 + + +class EncoderDeltas(BaseModel): + """Encoder tick deltas captured during a motion sequence.""" + + left: int + right: int + + +class OdometryParameters(BaseModel): + """ + Container for odometry parameters. + + Holds all parameters needed for odometry calibration. + """ + + wheels_distance: float = 0.0 + right_wheel_radius: float = 0.0 + left_wheel_radius: float = 0.0 + left_polarity: float = 0.0 + right_polarity: float = 0.0 + encoder_ticks: float = 0.0 diff --git a/cogip/protobuf/PB_ParameterCommands_pb2.py b/cogip/protobuf/PB_ParameterCommands_pb2.py index 8d4a2d852..a8c0c229c 100644 --- a/cogip/protobuf/PB_ParameterCommands_pb2.py +++ b/cogip/protobuf/PB_ParameterCommands_pb2.py @@ -1,11 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: PB_ParameterCommands.proto +# Protobuf Python Version: 5.28.0 """Generated protocol buffer code.""" -from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 0, + '', + 'PB_ParameterCommands.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -15,21 +26,21 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1aPB_ParameterCommands.proto\"\xbf\x01\n\x11PB_ParameterValue\x12\x15\n\x0b\x66loat_value\x18\x01 \x01(\x02H\x00\x12\x16\n\x0c\x64ouble_value\x18\x02 \x01(\x01H\x00\x12\x15\n\x0bint32_value\x18\x03 \x01(\x05H\x00\x12\x16\n\x0cuint32_value\x18\x04 \x01(\rH\x00\x12\x15\n\x0bint64_value\x18\x05 \x01(\x03H\x00\x12\x16\n\x0cuint64_value\x18\x06 \x01(\x04H\x00\x12\x14\n\nbool_value\x18\x07 \x01(\x08H\x00\x42\x07\n\x05value\"*\n\x16PB_ParameterGetRequest\x12\x10\n\x08key_hash\x18\x01 \x01(\r\"N\n\x17PB_ParameterGetResponse\x12\x10\n\x08key_hash\x18\x01 \x01(\r\x12!\n\x05value\x18\x02 \x01(\x0b\x32\x12.PB_ParameterValue\"M\n\x16PB_ParameterSetRequest\x12\x10\n\x08key_hash\x18\x01 \x01(\r\x12!\n\x05value\x18\x02 \x01(\x0b\x32\x12.PB_ParameterValue\"P\n\x17PB_ParameterSetResponse\x12\x10\n\x08key_hash\x18\x01 \x01(\r\x12#\n\x06status\x18\x02 \x01(\x0e\x32\x13.PB_ParameterStatus*G\n\x12PB_ParameterStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\x15\n\x11VALIDATION_FAILED\x10\x01\x12\r\n\tNOT_FOUND\x10\x02\x62\x06proto3') -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'PB_ParameterCommands_pb2', globals()) -if _descriptor._USE_C_DESCRIPTORS == False: - - DESCRIPTOR._options = None - _PB_PARAMETERSTATUS._serialized_start=509 - _PB_PARAMETERSTATUS._serialized_end=580 - _PB_PARAMETERVALUE._serialized_start=31 - _PB_PARAMETERVALUE._serialized_end=222 - _PB_PARAMETERGETREQUEST._serialized_start=224 - _PB_PARAMETERGETREQUEST._serialized_end=266 - _PB_PARAMETERGETRESPONSE._serialized_start=268 - _PB_PARAMETERGETRESPONSE._serialized_end=346 - _PB_PARAMETERSETREQUEST._serialized_start=348 - _PB_PARAMETERSETREQUEST._serialized_end=425 - _PB_PARAMETERSETRESPONSE._serialized_start=427 - _PB_PARAMETERSETRESPONSE._serialized_end=507 +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'PB_ParameterCommands_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_PB_PARAMETERSTATUS']._serialized_start=509 + _globals['_PB_PARAMETERSTATUS']._serialized_end=580 + _globals['_PB_PARAMETERVALUE']._serialized_start=31 + _globals['_PB_PARAMETERVALUE']._serialized_end=222 + _globals['_PB_PARAMETERGETREQUEST']._serialized_start=224 + _globals['_PB_PARAMETERGETREQUEST']._serialized_end=266 + _globals['_PB_PARAMETERGETRESPONSE']._serialized_start=268 + _globals['_PB_PARAMETERGETRESPONSE']._serialized_end=346 + _globals['_PB_PARAMETERSETREQUEST']._serialized_start=348 + _globals['_PB_PARAMETERSETREQUEST']._serialized_end=425 + _globals['_PB_PARAMETERSETRESPONSE']._serialized_start=427 + _globals['_PB_PARAMETERSETRESPONSE']._serialized_end=507 # @@protoc_insertion_point(module_scope) diff --git a/cogip/protobuf/PB_ParameterCommands_pb2.pyi b/cogip/protobuf/PB_ParameterCommands_pb2.pyi index 3986273a6..e73b519f2 100644 --- a/cogip/protobuf/PB_ParameterCommands_pb2.pyi +++ b/cogip/protobuf/PB_ParameterCommands_pb2.pyi @@ -4,18 +4,42 @@ from google.protobuf import message as _message from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor -NOT_FOUND: PB_ParameterStatus + +class PB_ParameterStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + SUCCESS: _ClassVar[PB_ParameterStatus] + VALIDATION_FAILED: _ClassVar[PB_ParameterStatus] + NOT_FOUND: _ClassVar[PB_ParameterStatus] SUCCESS: PB_ParameterStatus VALIDATION_FAILED: PB_ParameterStatus +NOT_FOUND: PB_ParameterStatus + +class PB_ParameterValue(_message.Message): + __slots__ = ("float_value", "double_value", "int32_value", "uint32_value", "int64_value", "uint64_value", "bool_value") + FLOAT_VALUE_FIELD_NUMBER: _ClassVar[int] + DOUBLE_VALUE_FIELD_NUMBER: _ClassVar[int] + INT32_VALUE_FIELD_NUMBER: _ClassVar[int] + UINT32_VALUE_FIELD_NUMBER: _ClassVar[int] + INT64_VALUE_FIELD_NUMBER: _ClassVar[int] + UINT64_VALUE_FIELD_NUMBER: _ClassVar[int] + BOOL_VALUE_FIELD_NUMBER: _ClassVar[int] + float_value: float + double_value: float + int32_value: int + uint32_value: int + int64_value: int + uint64_value: int + bool_value: bool + def __init__(self, float_value: _Optional[float] = ..., double_value: _Optional[float] = ..., int32_value: _Optional[int] = ..., uint32_value: _Optional[int] = ..., int64_value: _Optional[int] = ..., uint64_value: _Optional[int] = ..., bool_value: bool = ...) -> None: ... class PB_ParameterGetRequest(_message.Message): - __slots__ = ["key_hash"] + __slots__ = ("key_hash",) KEY_HASH_FIELD_NUMBER: _ClassVar[int] key_hash: int def __init__(self, key_hash: _Optional[int] = ...) -> None: ... class PB_ParameterGetResponse(_message.Message): - __slots__ = ["key_hash", "value"] + __slots__ = ("key_hash", "value") KEY_HASH_FIELD_NUMBER: _ClassVar[int] VALUE_FIELD_NUMBER: _ClassVar[int] key_hash: int @@ -23,7 +47,7 @@ class PB_ParameterGetResponse(_message.Message): def __init__(self, key_hash: _Optional[int] = ..., value: _Optional[_Union[PB_ParameterValue, _Mapping]] = ...) -> None: ... class PB_ParameterSetRequest(_message.Message): - __slots__ = ["key_hash", "value"] + __slots__ = ("key_hash", "value") KEY_HASH_FIELD_NUMBER: _ClassVar[int] VALUE_FIELD_NUMBER: _ClassVar[int] key_hash: int @@ -31,30 +55,9 @@ class PB_ParameterSetRequest(_message.Message): def __init__(self, key_hash: _Optional[int] = ..., value: _Optional[_Union[PB_ParameterValue, _Mapping]] = ...) -> None: ... class PB_ParameterSetResponse(_message.Message): - __slots__ = ["key_hash", "status"] + __slots__ = ("key_hash", "status") KEY_HASH_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] key_hash: int status: PB_ParameterStatus def __init__(self, key_hash: _Optional[int] = ..., status: _Optional[_Union[PB_ParameterStatus, str]] = ...) -> None: ... - -class PB_ParameterValue(_message.Message): - __slots__ = ["bool_value", "double_value", "float_value", "int32_value", "int64_value", "uint32_value", "uint64_value"] - BOOL_VALUE_FIELD_NUMBER: _ClassVar[int] - DOUBLE_VALUE_FIELD_NUMBER: _ClassVar[int] - FLOAT_VALUE_FIELD_NUMBER: _ClassVar[int] - INT32_VALUE_FIELD_NUMBER: _ClassVar[int] - INT64_VALUE_FIELD_NUMBER: _ClassVar[int] - UINT32_VALUE_FIELD_NUMBER: _ClassVar[int] - UINT64_VALUE_FIELD_NUMBER: _ClassVar[int] - bool_value: bool - double_value: float - float_value: float - int32_value: int - int64_value: int - uint32_value: int - uint64_value: int - def __init__(self, float_value: _Optional[float] = ..., double_value: _Optional[float] = ..., int32_value: _Optional[int] = ..., uint32_value: _Optional[int] = ..., int64_value: _Optional[int] = ..., uint64_value: _Optional[int] = ..., bool_value: bool = ...) -> None: ... - -class PB_ParameterStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): - __slots__ = [] diff --git a/cogip/protobuf/PB_Telemetry.proto b/cogip/protobuf/PB_Telemetry.proto new file mode 120000 index 000000000..43fea1c59 --- /dev/null +++ b/cogip/protobuf/PB_Telemetry.proto @@ -0,0 +1 @@ +../../submodules/mcu-firmware/lib/telemetry/PB_Telemetry.proto \ No newline at end of file diff --git a/cogip/protobuf/PB_Telemetry_pb2.py b/cogip/protobuf/PB_Telemetry_pb2.py new file mode 100644 index 000000000..3d1bf0913 --- /dev/null +++ b/cogip/protobuf/PB_Telemetry_pb2.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: PB_Telemetry.proto +# Protobuf Python Version: 5.28.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 0, + '', + 'PB_Telemetry.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12PB_Telemetry.proto\"\xb8\x01\n\x10PB_TelemetryData\x12\x10\n\x08key_hash\x18\x01 \x01(\r\x12\x14\n\x0ctimestamp_ms\x18\x02 \x01(\r\x12\x15\n\x0b\x66loat_value\x18\x03 \x01(\x02H\x00\x12\x15\n\x0bint32_value\x18\x05 \x01(\x05H\x00\x12\x16\n\x0cuint32_value\x18\x06 \x01(\rH\x00\x12\x15\n\x0bint64_value\x18\x07 \x01(\x03H\x00\x12\x16\n\x0cuint64_value\x18\x08 \x01(\x04H\x00\x42\x07\n\x05valueb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'PB_Telemetry_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_PB_TELEMETRYDATA']._serialized_start=23 + _globals['_PB_TELEMETRYDATA']._serialized_end=207 +# @@protoc_insertion_point(module_scope) diff --git a/cogip/protobuf/PB_Telemetry_pb2.pyi b/cogip/protobuf/PB_Telemetry_pb2.pyi new file mode 100644 index 000000000..cb4370384 --- /dev/null +++ b/cogip/protobuf/PB_Telemetry_pb2.pyi @@ -0,0 +1,23 @@ +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional + +DESCRIPTOR: _descriptor.FileDescriptor + +class PB_TelemetryData(_message.Message): + __slots__ = ("key_hash", "timestamp_ms", "float_value", "int32_value", "uint32_value", "int64_value", "uint64_value") + KEY_HASH_FIELD_NUMBER: _ClassVar[int] + TIMESTAMP_MS_FIELD_NUMBER: _ClassVar[int] + FLOAT_VALUE_FIELD_NUMBER: _ClassVar[int] + INT32_VALUE_FIELD_NUMBER: _ClassVar[int] + UINT32_VALUE_FIELD_NUMBER: _ClassVar[int] + INT64_VALUE_FIELD_NUMBER: _ClassVar[int] + UINT64_VALUE_FIELD_NUMBER: _ClassVar[int] + key_hash: int + timestamp_ms: int + float_value: float + int32_value: int + uint32_value: int + int64_value: int + uint64_value: int + def __init__(self, key_hash: _Optional[int] = ..., timestamp_ms: _Optional[int] = ..., float_value: _Optional[float] = ..., int32_value: _Optional[int] = ..., uint32_value: _Optional[int] = ..., int64_value: _Optional[int] = ..., uint64_value: _Optional[int] = ...) -> None: ... diff --git a/cogip/protobuf/__init__.py b/cogip/protobuf/__init__.py index dc5b0c185..96853784d 100644 --- a/cogip/protobuf/__init__.py +++ b/cogip/protobuf/__init__.py @@ -16,3 +16,4 @@ from .PB_PidEnum_pb2 import PB_PidEnum # noqa from .PB_Controller_pb2 import PB_ControllerEnum, PB_Controller # noqa from .PB_ParameterCommands_pb2 import PB_ParameterGetRequest, PB_ParameterSetRequest, PB_ParameterGetResponse, PB_ParameterSetResponse, PB_ParameterStatus # noqa +from .PB_Telemetry_pb2 import PB_TelemetryData diff --git a/cogip/tools/copilot/copilot.py b/cogip/tools/copilot/copilot.py index e5f8b356a..927fad9ab 100644 --- a/cogip/tools/copilot/copilot.py +++ b/cogip/tools/copilot/copilot.py @@ -10,7 +10,17 @@ from cogip.cpp.libraries.models import PoseOrderList as SharedPoseOrderList from cogip.cpp.libraries.shared_memory import LockName, SharedMemory, WritePriorityLock from cogip.models.actuators import ActuatorsKindEnum -from cogip.protobuf import PB_ActuatorState, PB_PathPose, PB_Pid, PB_PidEnum, PB_Pose, PB_State +from cogip.protobuf import ( + PB_ActuatorState, + PB_ParameterGetResponse, + PB_ParameterSetResponse, + PB_PathPose, + PB_Pid, + PB_PidEnum, + PB_Pose, + PB_State, + PB_TelemetryData, +) from . import logger from .pbcom import PBCom, pb_exception_handler from .pid import Pid @@ -37,6 +47,13 @@ reset_uuid: int = 0x3001 copilot_connected_uuid: int = 0x3002 copilot_disconnected_uuid: int = 0x3003 +parameter_set_uuid: int = 0x3004 +parameter_set_response_uuid: int = 0x3005 +parameter_get_uuid: int = 0x3006 +parameter_get_response_uuid: int = 0x3007 +telemetry_enable_uuid: int = 0x3008 +telemetry_disable_uuid: int = 0x3009 +telemetry_data_uuid: int = 0x300A # Game: 0x4000 - 0x4FFF game_start_uuid: int = 0x4001 game_end_uuid: int = 0x4002 @@ -88,6 +105,9 @@ def __init__(self, server_url: str, id: int, can_channel: str, can_bitrate: int, actuator_state_uuid: self.handle_actuator_state, pid_uuid: self.handle_pid, blocked_uuid: self.handle_blocked, + parameter_get_response_uuid: self.handle_parameter_get_response, + parameter_set_response_uuid: self.handle_parameter_set_response, + telemetry_data_uuid: self.handle_telemetry_data, } self.pbcom = PBCom(can_channel, can_bitrate, canfd_data_bitrate, pb_message_handlers) @@ -283,6 +303,75 @@ async def handle_blocked(self) -> None: if self.sio_events.connected: await self.sio_events.emit("blocked") + @pb_exception_handler + async def handle_parameter_get_response(self, message: bytes | None = None): + """ + Handle parameter get response from firmware. + + Forward response to the firmware_parameter_manager. + """ + pb_response = PB_ParameterGetResponse() + + if message: + await self.loop.run_in_executor(None, pb_response.ParseFromString, message) + + response = MessageToDict( + pb_response, + always_print_fields_with_no_presence=True, + preserving_proto_field_name=True, + use_integers_for_enums=True, + ) + logger.info(f"[CAN] get_response: {response}") + + if self.sio_events.connected: + await self.sio_events.emit("get_parameter_response", response) + + @pb_exception_handler + async def handle_parameter_set_response(self, message: bytes | None = None): + """ + Handle parameter set response from firmware. + + Forward response to the firmware_parameter_manager. + """ + pb_response = PB_ParameterSetResponse() + + if message: + await self.loop.run_in_executor(None, pb_response.ParseFromString, message) + + response = MessageToDict( + pb_response, + always_print_fields_with_no_presence=True, + preserving_proto_field_name=True, + use_integers_for_enums=True, + ) + logger.info(f"[CAN] set_response: {response}") + + if self.sio_events.connected: + await self.sio_events.emit("set_parameter_response", response) + + @pb_exception_handler + async def handle_telemetry_data(self, message: bytes | None = None): + """ + Handle parameter telemetry data from firmware. + + Forward response to the TODO: + """ + pb_telemetry = PB_TelemetryData() + + if message: + await self.loop.run_in_executor(None, pb_telemetry.ParseFromString, message) + + telemetry = MessageToDict( + pb_telemetry, + always_print_fields_with_no_presence=True, + preserving_proto_field_name=True, + use_integers_for_enums=True, + ) + logger.debug(f"[CAN] telemetry data: {telemetry}") + + if self.sio_events.connected: + await self.sio_events.emit("telemetry_data", telemetry) + async def new_path_event_loop(self): """ Async worker watching for new path orders in shared memory. diff --git a/cogip/tools/copilot/sio_events.py b/cogip/tools/copilot/sio_events.py index 1d7a5f04e..a1c0c3f3a 100644 --- a/cogip/tools/copilot/sio_events.py +++ b/cogip/tools/copilot/sio_events.py @@ -6,8 +6,17 @@ from pydantic import TypeAdapter from cogip import models +from cogip.models import FirmwareParameter from cogip.models.actuators import ActuatorCommand, PositionalActuatorCommand -from cogip.protobuf import PB_ActuatorCommand, PB_Controller, PB_PathPose, PB_Pid_Id, PB_PidEnum +from cogip.protobuf import ( + PB_ActuatorCommand, + PB_Controller, + PB_ParameterGetRequest, + PB_ParameterSetRequest, + PB_PathPose, + PB_Pid_Id, + PB_PidEnum, +) from . import copilot, logger from .menu import menu @@ -98,6 +107,19 @@ async def on_pose_start(self, data: dict[str, Any]): start_pose.copy_pb(pb_start_pose) await self.copilot.pbcom.send_can_message(copilot.pose_start_uuid, pb_start_pose) + async def on_pose_order(self, data: dict[str, Any]): + """ + Callback on pose order (from planner). + Forward to mcu-firmware. + """ + logger.info(f"[SIO] Pose order: {data}") + pose_order = models.PathPose.model_validate(data) + if self.copilot.id > 1: + pose_order.motion_direction = models.MotionDirection.FORWARD_ONLY + pb_pose_order = PB_PathPose() + pose_order.copy_pb(pb_pose_order) + await self.copilot.pbcom.send_can_message(copilot.pose_order_uuid, pb_pose_order) + async def on_actuators_start(self): """ Callback on actuators_start (from dashboard). @@ -183,3 +205,45 @@ async def on_brake(self): """ logger.info("[SIO] Brake") await self.copilot.pbcom.send_can_message(copilot.brake_uuid, None) + + async def on_get_parameter_value(self, data: dict[str, Any]): + """ + Callback on get_parameter_value. + Forward to firmware. + """ + logger.info(f"[SIO] Get parameter: {data}") + + parameter = FirmwareParameter.model_validate(data) + pb_get_request = PB_ParameterGetRequest() + parameter.pb_copy(pb_get_request) + + await self.copilot.pbcom.send_can_message(copilot.parameter_get_uuid, pb_get_request) + + async def on_set_parameter_value(self, data: dict[str, Any]): + """ + Callback on set_parameter_value. + Forward to firmware. + """ + logger.info(f"[SIO] Set parameter: {data}") + + parameter = FirmwareParameter.model_validate(data) + pb_set_request = PB_ParameterSetRequest() + parameter.pb_copy(pb_set_request) + + await self.copilot.pbcom.send_can_message(copilot.parameter_set_uuid, pb_set_request) + + async def on_telemetry_enable(self, data: dict[str, Any] | None = None): + """ + Callback on telemetry enable message. + Forward to firmware. + """ + logger.info("[SIO] Telemetry enable") + await self.copilot.pbcom.send_can_message(copilot.telemetry_enable_uuid, None) + + async def on_telemetry_disable(self, data: dict[str, Any] | None = None): + """ + Callback on telemetry disable message. + Forward to firmware. + """ + logger.info("[SIO] Telemetry disable") + await self.copilot.pbcom.send_can_message(copilot.telemetry_disable_uuid, None) diff --git a/cogip/tools/firmware_odometry_calibration/__init__.py b/cogip/tools/firmware_odometry_calibration/__init__.py new file mode 100644 index 000000000..8528f3515 --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/__init__.py @@ -0,0 +1,5 @@ +import logging + +from cogip.utils.logger import Logger + +logger = Logger("cogip-odometry-calibration", level=logging.WARNING) diff --git a/cogip/tools/firmware_odometry_calibration/__main__.py b/cogip/tools/firmware_odometry_calibration/__main__.py new file mode 100644 index 000000000..b75bce042 --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/__main__.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +""" +CLI entry point for the Odometry Calibration tool. + +Provides a command-line interface to calibrate robot odometry parameters +by communicating with the firmware via SocketIO through cogip-server. +""" + +import asyncio +import logging +from pathlib import Path +from typing import Annotated + +import typer +import yaml + +from cogip.models import FirmwareParametersGroup +from cogip.tools.firmware_odometry_calibration import logger +from cogip.tools.firmware_odometry_calibration.odometry_calibration import OdometryCalibration + + +def main_opt( + *, + server_url: Annotated[ + str | None, + typer.Option( + "-s", + "--server-url", + help="cogip-server URL", + envvar="COGIP_SOCKETIO_SERVER_URL", + ), + ] = None, + robot_id: Annotated[ + int, + typer.Option( + "-i", + "--id", + min=1, + help="Robot ID.", + envvar=["ROBOT_ID"], + ), + ] = 1, + debug: Annotated[ + bool, + typer.Option( + "-d", + "--debug", + help="Turn on debug messages", + envvar="CALIBRATION_DEBUG", + ), + ] = False, +): + if debug: + logger.setLevel(logging.DEBUG) + + if not server_url: + server_url = f"http://localhost:809{robot_id}" + + # Load bundled parameters definition YAML + params_path = Path(__file__).with_name("odometry_parameters.yaml") + parameters_data = yaml.safe_load(params_path.read_text()) + + parameters_group = FirmwareParametersGroup.model_validate(parameters_data["parameters"]) + + # Run calibration + calibration = OdometryCalibration(server_url, parameters_group) + asyncio.run(calibration.run()) + + +def main(): + """ + Run odometry calibration tool. + + During installation of cogip-tools, `setuptools` is configured + to create the `cogip-odometry-calibration` script using this function as entrypoint. + """ + typer.run(main_opt) + + +if __name__ == "__main__": + main() diff --git a/cogip/tools/firmware_odometry_calibration/calculator.py b/cogip/tools/firmware_odometry_calibration/calculator.py new file mode 100644 index 000000000..f674141af --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/calculator.py @@ -0,0 +1,260 @@ +""" +Odometry Calibration Calculator. + +Pure mathematical functions for computing calibration parameters. + +Calibrates three parameters for differential drive odometry: +wheel distance, left wheel radius, and right wheel radius. + +Calibration process: + Phase 1 (Turn in Place): Spin N rotations to compute wheel distance from encoder arcs + Phase 2 (Square Path): Drive squares to isolate linear motion and find wheel radius ratio (beta) + Phase 3 (Straight Line): Drive known distance to solve for actual wheel radius using beta +""" + +import math + +from cogip.models import CalibrationResult, CalibrationState + + +def compute_alpha_coefficients( + turns: int, + lticks_delta: int, + rticks_delta: int, + encoder_ticks: float, +) -> tuple[float, float]: + """ + Compute alpha coefficients from turn-in-place data. + + Alpha represents wheel rotations per robot rotation. + + Args: + turns: Number of full rotations performed + lticks_delta: Change in left encoder ticks + rticks_delta: Change in right encoder ticks + encoder_ticks: Encoder ticks per wheel revolution + + Returns: + Tuple of (alpha_l, alpha_r) + """ + alpha_l = lticks_delta / (encoder_ticks * turns) + alpha_r = rticks_delta / (encoder_ticks * turns) + return alpha_l, alpha_r + + +def compute_wheel_distance( + alpha_l: float, + alpha_r: float, + left_wheel_radius: float, + right_wheel_radius: float, +) -> float: + """ + Compute wheel distance from alpha coefficients and wheel radii. + + Formula: axletrack = |alpha_l| * radius_l + |alpha_r| * radius_r + + During rotation in place, wheels turn in opposite directions, so alpha + coefficients have opposite signs. We use absolute values to sum their + contributions. + + Args: + alpha_l: Left wheel rotations per robot rotation (may be negative) + alpha_r: Right wheel rotations per robot rotation (may be negative) + left_wheel_radius: Current left wheel radius estimate + right_wheel_radius: Current right wheel radius estimate + + Returns: + Computed wheel distance in mm + """ + return abs(alpha_l) * left_wheel_radius + abs(alpha_r) * right_wheel_radius + + +def compute_wheel_distance_result( + turns: int, + lticks_delta: int, + rticks_delta: int, + encoder_ticks: float, + left_wheel_radius: float, + right_wheel_radius: float, + left_polarity: float = 1.0, + right_polarity: float = 1.0, +) -> tuple[CalibrationResult, CalibrationState]: + """ + Phase 1: Compute wheel distance from turn-in-place data. + + Args: + turns: Number of full rotations performed + lticks_delta: Change in left encoder ticks (raw) + rticks_delta: Change in right encoder ticks (raw) + encoder_ticks: Encoder ticks per wheel revolution + left_wheel_radius: Current left wheel radius + right_wheel_radius: Current right wheel radius + left_polarity: Left encoder polarity correction (+1 or -1) + right_polarity: Right encoder polarity correction (+1 or -1) + + Returns: + Tuple of (CalibrationResult, updated CalibrationState) + """ + # Apply polarity correction + lticks_delta = int(lticks_delta * left_polarity) + rticks_delta = int(rticks_delta * right_polarity) + + alpha_l, alpha_r = compute_alpha_coefficients(turns, lticks_delta, rticks_delta, encoder_ticks) + new_wheels_distance = compute_wheel_distance(alpha_l, alpha_r, left_wheel_radius, right_wheel_radius) + + result = CalibrationResult( + wheels_distance=new_wheels_distance, + right_wheel_radius=right_wheel_radius, + left_wheel_radius=left_wheel_radius, + ) + + state = CalibrationState( + alpha_l=alpha_l, + alpha_r=alpha_r, + beta=0.0, + ) + + return result, state + + +def compute_beta_coefficient( + lticks_linear: float, + rticks_linear: float, +) -> float | None: + """ + Compute beta coefficient (radius ratio) from linear encoder components. + + For straight-line motion, both wheels travel the same distance D: + D = 2π * radius_l * (lticks / encoder_ticks) + D = 2π * radius_r * (rticks / encoder_ticks) + + Therefore: radius_r / radius_l = lticks / rticks + + Beta = radius_r / radius_l = lticks_linear / rticks_linear + + Args: + lticks_linear: Linear component of left encoder ticks + rticks_linear: Linear component of right encoder ticks + + Returns: + Beta coefficient (radius_r / radius_l), or None if rticks_linear is too small + """ + if abs(rticks_linear) < 1: + return None + return lticks_linear / rticks_linear + + +def compute_right_wheel_radius_result( + squares: int, + lticks_delta: int, + rticks_delta: int, + state: CalibrationState, + encoder_ticks: float, + left_wheel_radius: float, + left_polarity: float = 1.0, + right_polarity: float = 1.0, +) -> tuple[CalibrationResult, CalibrationState] | None: + """ + Phase 2: Compute right wheel radius from square trajectory data. + + Args: + squares: Number of square paths performed + lticks_delta: Change in left encoder ticks (raw) + rticks_delta: Change in right encoder ticks (raw) + state: Current calibration state with alpha coefficients + encoder_ticks: Encoder ticks per wheel revolution + left_wheel_radius: Current left wheel radius + left_polarity: Left encoder polarity correction (+1 or -1) + right_polarity: Right encoder polarity correction (+1 or -1) + + Returns: + Tuple of (CalibrationResult, updated CalibrationState), or None if computation fails + """ + # Apply polarity correction + lticks_delta = int(lticks_delta * left_polarity) + rticks_delta = int(rticks_delta * right_polarity) + + # Subtract rotation component to get linear component + lticks_linear = lticks_delta - (state.alpha_l * encoder_ticks * squares) + rticks_linear = rticks_delta - (state.alpha_r * encoder_ticks * squares) + + beta = compute_beta_coefficient(lticks_linear, rticks_linear) + if beta is None: + return None + + new_right_wheel_radius = beta * left_wheel_radius + new_wheels_distance = compute_wheel_distance( + state.alpha_l, state.alpha_r, left_wheel_radius, new_right_wheel_radius + ) + + result = CalibrationResult( + wheels_distance=new_wheels_distance, + right_wheel_radius=new_right_wheel_radius, + left_wheel_radius=left_wheel_radius, + ) + + updated_state = CalibrationState( + alpha_l=state.alpha_l, + alpha_r=state.alpha_r, + beta=beta, + ) + + return result, updated_state + + +def compute_left_wheel_radius_result( + distance_mm: int, + lticks_delta: int, + rticks_delta: int, + state: CalibrationState, + encoder_ticks: float, + left_polarity: float = 1.0, + right_polarity: float = 1.0, +) -> tuple[CalibrationResult, CalibrationState] | None: + """ + Phase 3: Compute left wheel radius from straight line data. + + For straight-line motion of distance D: + D = 2π * radius_l * (lticks / encoder_ticks) + D = 2π * radius_r * (rticks / encoder_ticks) + + With radius_r = beta * radius_l, summing both equations: + 2D = 2π * radius_l * (lticks + beta * rticks) / encoder_ticks + + Formula: radius_l = D * encoder_ticks / (π * (lticks + beta * rticks)) + + Args: + distance_mm: Distance traveled in mm + lticks_delta: Change in left encoder ticks (raw) + rticks_delta: Change in right encoder ticks (raw) + state: Current calibration state with alpha and beta coefficients + encoder_ticks: Encoder ticks per wheel revolution + left_polarity: Left encoder polarity correction (+1 or -1) + right_polarity: Right encoder polarity correction (+1 or -1) + + Returns: + Tuple of (CalibrationResult, CalibrationState), or None if computation fails + """ + # Apply polarity correction + lticks_delta = int(lticks_delta * left_polarity) + rticks_delta = int(rticks_delta * right_polarity) + + denominator = math.pi * (lticks_delta + state.beta * rticks_delta) + + if abs(denominator) < 1: + return None + + new_left_wheel_radius = (distance_mm * encoder_ticks) / denominator + new_right_wheel_radius = state.beta * new_left_wheel_radius + new_wheels_distance = compute_wheel_distance( + state.alpha_l, state.alpha_r, new_left_wheel_radius, new_right_wheel_radius + ) + + result = CalibrationResult( + wheels_distance=new_wheels_distance, + right_wheel_radius=new_right_wheel_radius, + left_wheel_radius=new_left_wheel_radius, + ) + + # State unchanged in phase 3 + return result, state diff --git a/cogip/tools/firmware_odometry_calibration/firmware_adapter.py b/cogip/tools/firmware_odometry_calibration/firmware_adapter.py new file mode 100644 index 000000000..a460bcef6 --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/firmware_adapter.py @@ -0,0 +1,343 @@ +""" +Firmware Adapter for Odometry Calibration + +Unified facade for all firmware interactions via SocketIO: +- Parameter load/save operations +- Encoder telemetry +- Motion control via pose_order/pose_reached +- Calibration-specific movement sequences +""" + +from __future__ import annotations +import asyncio +import math + +import socketio +from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn + +from cogip.models import OdometryParameters +from cogip.models.models import Pose +from cogip.tools.firmware_parameter_manager.firmware_parameter_manager import FirmwareParameterManager +from cogip.tools.firmware_telemetry.firmware_telemetry_manager import FirmwareTelemetryManager +from cogip.utils.console_ui import ConsoleUI +from . import logger +from .types import SQUARE_PATH_CCW + + +class FirmwareAdapter: + """Unified adapter for firmware operations via SocketIO. + + Handles odometry parameter load/save, encoder telemetry, motion control + via pose_order/pose_reached events, and calibration movement sequences. + """ + + # Parameter name constants + PARAM_WHEELS_DISTANCE = "encoder_wheels_distance_mm" + PARAM_RIGHT_WHEEL_DIAMETER = "right_wheel_diameter_mm" + PARAM_LEFT_WHEEL_DIAMETER = "left_wheel_diameter_mm" + PARAM_LEFT_POLARITY = "qdec_left_polarity" + PARAM_RIGHT_POLARITY = "qdec_right_polarity" + PARAM_ENCODER_RESOLUTION = "encoder_wheels_resolution_pulses" + + # Telemetry keys for encoder values + TELEMETRY_LEFT_ENCODER = "encoder_left" + TELEMETRY_RIGHT_ENCODER = "encoder_right" + + def __init__( + self, + sio: socketio.AsyncClient, + param_manager: FirmwareParameterManager, + telemetry_manager: FirmwareTelemetryManager, + pose_reached_event: asyncio.Event, + console: ConsoleUI | None = None, + ): + """ + Initialize the firmware adapter. + + Args: + sio: SocketIO client for communication + param_manager: Firmware parameter manager for read/write operations + telemetry_manager: Firmware telemetry manager for encoder tick counts + pose_reached_event: Event signaled when robot reaches target position + console: Optional ConsoleUI for progress display + """ + self.sio = sio + self._param_manager = param_manager + self._telemetry = telemetry_manager + self.pose_reached_event = pose_reached_event + self._console = console or ConsoleUI() + + # === Parameters === + + async def load_parameters(self) -> OdometryParameters: + """ + Load current odometry parameters from firmware. + + Returns: + OdometryParameters populated with values from firmware + + Raises: + TimeoutError: If firmware communication times out + """ + logger.info("Loading parameters from firmware...") + ( + wheels_distance, + right_diameter, + left_diameter, + left_polarity, + right_polarity, + encoder_ticks, + ) = await asyncio.gather( + self._param_manager.get_parameter_value(self.PARAM_WHEELS_DISTANCE), + self._param_manager.get_parameter_value(self.PARAM_RIGHT_WHEEL_DIAMETER), + self._param_manager.get_parameter_value(self.PARAM_LEFT_WHEEL_DIAMETER), + self._param_manager.get_parameter_value(self.PARAM_LEFT_POLARITY), + self._param_manager.get_parameter_value(self.PARAM_RIGHT_POLARITY), + self._param_manager.get_parameter_value(self.PARAM_ENCODER_RESOLUTION), + ) + + params = OdometryParameters( + wheels_distance=wheels_distance, + right_wheel_radius=right_diameter / 2.0, + left_wheel_radius=left_diameter / 2.0, + left_polarity=left_polarity, + right_polarity=right_polarity, + encoder_ticks=encoder_ticks, + ) + + logger.info(f"Loaded parameters: {params}") + + return params + + async def save_parameters(self, params: OdometryParameters) -> None: + """ + Save odometry parameters to firmware. + + Only saves the calibrated values (wheels_distance, wheel radii). + Polarity and encoder ticks are not modified. + + Args: + params: OdometryParameters with values to save + + Raises: + TimeoutError: If firmware communication times out + """ + logger.info(f"Saving parameters to firmware: {params}") + + await asyncio.gather( + self._param_manager.set_parameter_value(self.PARAM_WHEELS_DISTANCE, params.wheels_distance), + self._param_manager.set_parameter_value(self.PARAM_RIGHT_WHEEL_DIAMETER, params.right_wheel_radius * 2.0), + self._param_manager.set_parameter_value(self.PARAM_LEFT_WHEEL_DIAMETER, params.left_wheel_radius * 2.0), + ) + + logger.info("Parameters saved successfully") + + # === Telemetry === + + async def get_encoder_ticks(self) -> tuple[int, int]: + """ + Get current encoder tick counts from firmware telemetry. + + Waits briefly for telemetry to stabilize before reading values. + + Returns: + Tuple of (left_ticks, right_ticks) + """ + await asyncio.sleep(0.1) + + left = self._telemetry.get_value(self.TELEMETRY_LEFT_ENCODER) + right = self._telemetry.get_value(self.TELEMETRY_RIGHT_ENCODER) + + return int(left), int(right) + + # === Motion Control === + + async def set_start_position(self, x: float, y: float, orientation: float) -> None: + """ + Set the robot's starting reference position. + + Args: + x: X coordinate in mm + y: Y coordinate in mm + orientation: Orientation in degrees + """ + pose = Pose(x=x, y=y, O=orientation) + + logger.debug(f"Setting start position: {pose}") + + await self.sio.emit("pose_start", pose.model_dump(), namespace="/calibration") + + async def _send_pose_order(self, x: float, y: float, orientation: float) -> None: + """ + Send a pose order to the robot. + + Args: + x: Target X coordinate in mm + y: Target Y coordinate in mm + orientation: Target orientation in degrees + """ + pose = Pose(x=x, y=y, O=orientation) + self.pose_reached_event.clear() + + logger.debug(f"Sending pose order: {pose}") + + await self.sio.emit("pose_order", pose.model_dump(), namespace="/calibration") + + async def _wait_pose_reached(self, timeout: float = 60.0) -> bool: + """ + Wait for the robot to reach its target position. + + Args: + timeout: Maximum time to wait in seconds + + Returns: + True if pose was reached, False if timeout + """ + try: + await asyncio.wait_for(self.pose_reached_event.wait(), timeout=timeout) + return True + except TimeoutError: + logger.warning(f"Timeout waiting for pose reached (>{timeout}s)") + return False + + async def goto(self, x: float, y: float, orientation: float, timeout: float = 60.0) -> bool: + """ + Move robot to target position and wait for completion. + + Args: + x: Target X coordinate in mm + y: Target Y coordinate in mm + orientation: Target orientation in degrees + timeout: Maximum time to wait in seconds + + Returns: + True if movement completed, False if timeout + """ + await self._send_pose_order(x, y, orientation) + return await self._wait_pose_reached(timeout) + + # === Calibration Movement Sequences === + + async def execute_rotations(self, num_rotations: int) -> bool: + """ + Execute N full rotations in place (Phase 1: WHEEL_DISTANCE). + + Args: + num_rotations: Number of full 360-degree rotations + + Returns: + True if successful, False otherwise. + """ + target_orientation = num_rotations * 360.0 + logger.info(f"Executing {num_rotations} rotations ({target_orientation}°)...") + + # Reset position to origin + await self.set_start_position(0, 0, 0) + await asyncio.sleep(0.2) + + quarter_turns = [90, 180, -90, 0] + + with Progress( + SpinnerColumn(), + TextColumn("[cyan]Rotation {task.fields[rotation]}/{task.fields[total_count]}[/]"), + TimeElapsedColumn(), + console=self._console, + ) as progress: + task = progress.add_task("", rotation=1, total_count=num_rotations) + + for rotation_num in range(num_rotations): + progress.update(task, rotation=rotation_num + 1) + logger.debug(f"Rotation {rotation_num + 1}/{num_rotations}") + for target_angle in quarter_turns: + success = await self.goto(0, 0, target_angle, timeout=30.0) + if not success: + self._console.show_error( + f"Rotation {rotation_num + 1}/{num_rotations} failed at {target_angle}° (timeout)" + ) + return False + + progress.update(task, rotation=num_rotations) + + logger.info("Rotations completed") + return True + + async def execute_squares(self, num_squares: int) -> bool: + """ + Execute N square paths clockwise (Phase 2: RIGHT_WHEEL_RADIUS). + + Uses the predefined 500mm square path from __init__.py. + + Args: + num_squares: Number of complete squares to execute + + Returns: + True if successful, False otherwise. + """ + logger.info(f"Executing {num_squares} squares...") + + # Reset position to origin + await self.set_start_position(0, 0, 0) + await asyncio.sleep(0.2) + + with Progress( + SpinnerColumn(), + TextColumn("[cyan]Square {task.fields[square]}/{task.fields[total_count]}[/]"), + TimeElapsedColumn(), + console=self._console, + ) as progress: + task = progress.add_task("", square=1, total_count=num_squares) + + for square_num in range(num_squares): + progress.update(task, square=square_num + 1) + for waypoint_idx, waypoint in enumerate(SQUARE_PATH_CCW): + orientation_degrees = math.degrees(waypoint.O) + logger.debug( + f"Square {square_num + 1}/{num_squares}, waypoint {waypoint_idx + 1}/4: " + f"({waypoint.x}, {waypoint.y}, {orientation_degrees}°)" + ) + + success = await self.goto(waypoint.x, waypoint.y, orientation_degrees, timeout=30.0) + if not success: + self._console.show_error( + f"Square path failed at waypoint {waypoint_idx + 1}/4 of square {square_num + 1}" + ) + return False + + progress.update(task, square=num_squares) + + logger.info("Squares completed") + return True + + async def execute_straight_line(self, distance_mm: int) -> bool: + """ + Execute a straight line movement (Phase 3: LEFT_WHEEL_RADIUS). + + Moves forward from origin by the specified distance. + + Args: + distance_mm: Distance to travel in mm + + Returns: + True if successful, False otherwise. + """ + logger.info(f"Executing straight line movement ({distance_mm}mm)...") + + # Reset position to origin facing forward (0°) + await self.set_start_position(0, 0, 0) + await asyncio.sleep(0.2) + + with Progress( + SpinnerColumn(), + TextColumn("[cyan]Straight line ({task.fields[distance]} mm)[/]"), + TimeElapsedColumn(), + console=self._console, + ) as progress: + progress.add_task("", distance=distance_mm) + + success = await self.goto(distance_mm, 0, 0, timeout=30.0) + if not success: + self._console.show_error("Straight line movement failed (timeout)") + return False + + logger.info("Straight line completed") + return True diff --git a/cogip/tools/firmware_odometry_calibration/odometry_calibration.py b/cogip/tools/firmware_odometry_calibration/odometry_calibration.py new file mode 100644 index 000000000..279339fc4 --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/odometry_calibration.py @@ -0,0 +1,385 @@ +""" +Odometry Calibration Tool for differential drive robots. + +Calibrates three parameters: wheel distance, left/right wheel radius. + +Phases: + 1. Wheel Distance - Turn in place N rotations + 2. Right Wheel Radius - Execute square trajectories to find radius ratio + 3. Left Wheel Radius - Travel straight line to compute absolute radius + +Usage: + - Start with approximate parameter values in firmware + - Execute phases in order (each depends on previous results) + - Manually reposition robot to theoretical position after each motion +""" + +from __future__ import annotations +import asyncio +import sys +from collections.abc import Awaitable, Callable + +import socketio + +from cogip.models import ( + CalibrationResult, + CalibrationState, + EncoderDeltas, + FirmwareParametersGroup, + OdometryParameters, +) +from cogip.tools.firmware_parameter_manager.firmware_parameter_manager import FirmwareParameterManager +from cogip.tools.firmware_telemetry.firmware_telemetry_manager import FirmwareTelemetryManager +from cogip.utils.console_ui import ConsoleUI +from . import logger +from .calculator import ( + compute_left_wheel_radius_result, + compute_right_wheel_radius_result, + compute_wheel_distance_result, +) +from .firmware_adapter import FirmwareAdapter +from .sio_events import SioEvents +from .types import CalibrationPhaseType + + +class OdometryCalibration: + """ + Odometry Calibration controller. + + Orchestrates the calibration process by coordinating: + - SocketIO connection to cogip-server + - FirmwareAdapter for all firmware operations + - User interaction via ConsoleUI + """ + + def __init__(self, server_url: str, parameters_group: FirmwareParametersGroup): + """ + Initialize the calibration controller. + + Args: + server_url: URL of the cogip-server + parameters_group: Firmware parameters configuration + """ + self.server_url = server_url + self.console = ConsoleUI() + + # Event to signal pose reached + self.pose_reached_event = asyncio.Event() + + # SocketIO client and namespaces + self.sio = socketio.AsyncClient(logger=False) + self.sio_ns = SioEvents(self) + self.sio.register_namespace(self.sio_ns) + + # Managers (they register their own namespaces) + self.param_manager = FirmwareParameterManager(self.sio, parameters_group) + self.telemetry_manager = FirmwareTelemetryManager(self.sio) + + # Firmware adapter for motion control + self.firmware = FirmwareAdapter( + self.sio, self.param_manager, self.telemetry_manager, self.pose_reached_event, self.console + ) + + # Calibration state + self.params: OdometryParameters | None = None + self.initial_params: OdometryParameters | None = None + self.state = CalibrationState() + + async def _connect(self) -> None: + """Connect to cogip-server.""" + self.console.show_info(f"Connecting to {self.server_url}...") + await self.sio.connect( + self.server_url, + namespaces=[ + self.sio_ns.namespace, + self.param_manager.namespace, + self.telemetry_manager.namespace, + ], + ) + + # Wait for all namespaces to be connected + self.console.show_info("Waiting for connections...") + while not (self.sio_ns.connected and self.param_manager.is_connected and self.telemetry_manager.is_connected): + await asyncio.sleep(0.1) + + self.console.show_success("Connected to cogip-server") + + # Enable telemetry + await self.telemetry_manager.enable() + await asyncio.sleep(0.2) + + async def _disconnect(self) -> None: + """Disconnect from server.""" + if self.sio and self.sio.connected: + await self.sio.disconnect() + + def _display_intro(self) -> None: + """Display introduction panel.""" + self.console.show_panel( + "This tool calibrates the robot's odometry parameters through 3 phases:\n" + "1. [header]Wheel Distance[/] - Robot will rotates in place\n" + "2. [header]Right Wheel Radius[/] - Robot will performs square paths\n" + "3. [header]Left Wheel Radius[/] - Robot will travels along a straight line", + title="Odometry Calibration Tool", + ) + + def _display_parameters(self, params: OdometryParameters, title: str) -> None: + """Display parameters in a table.""" + self.console.show_key_value_table( + [ + ("Wheels Distance", f"{params.wheels_distance:.3f} mm"), + ("Left Wheel Radius", f"{params.left_wheel_radius:.3f} mm"), + ("Right Wheel Radius", f"{params.right_wheel_radius:.3f} mm"), + ("Left Polarity", f"{params.left_polarity:.0f}"), + ("Right Polarity", f"{params.right_polarity:.0f}"), + ("Encoder Ticks", f"{params.encoder_ticks:.0f} ticks/rev"), + ], + title=title, + ) + + def _display_result(self, result: CalibrationResult, current: OdometryParameters) -> None: + """Display calibration result comparison.""" + change_wd = result.wheels_distance - current.wheels_distance + change_lr = result.left_wheel_radius - current.left_wheel_radius + change_rr = result.right_wheel_radius - current.right_wheel_radius + + table = self.console.create_table( + title="Calibration Result", + columns=[ + ("Parameter", {"style": "label"}), + ("Before", {"style": "muted", "justify": "right"}), + ("After", {"style": "value", "justify": "right"}), + ("Change", {"style": "warning", "justify": "right"}), + ], + ) + table.add_row( + "Wheels Distance (mm)", + f"{current.wheels_distance:.3f}", + f"{result.wheels_distance:.3f}", + f"{change_wd:+.3f}", + ) + table.add_row( + "Left Wheel Radius (mm)", + f"{current.left_wheel_radius:.3f}", + f"{result.left_wheel_radius:.3f}", + f"{change_lr:+.3f}", + ) + table.add_row( + "Right Wheel Radius (mm)", + f"{current.right_wheel_radius:.3f}", + f"{result.right_wheel_radius:.3f}", + f"{change_rr:+.3f}", + ) + self.console.print(table) + + async def _apply_result(self, result: CalibrationResult) -> None: + """Apply calibration result to current parameters and save to firmware.""" + self.params.wheels_distance = result.wheels_distance + self.params.right_wheel_radius = result.right_wheel_radius + self.params.left_wheel_radius = result.left_wheel_radius + + self.console.show_info("Saving parameters to firmware...") + await self.firmware.save_parameters(self.params) + self.console.show_success("Parameters saved to firmware") + + async def _execute_motion_sequence( + self, + motion_fn: Callable[[int], Awaitable[bool]], + motion_arg: int, + ) -> EncoderDeltas | None: + """ + Execute motion and collect encoder tick deltas. + + Args: + motion_fn: Async function to execute the motion (e.g., execute_rotations). + motion_arg: Argument to pass to the motion function (e.g., number of rotations). + + Returns: + EncoderDeltas or None if motion failed. + """ + await self.console.wait_for_enter("Position the robot at the starting position") + left_before, right_before = await self.firmware.get_encoder_ticks() + logger.info(f"Encoder ticks before: L={left_before}, R={right_before}") + + if not await motion_fn(motion_arg): + return None + + await self.console.wait_for_enter("Reposition the robot to its theoretical position") + left_after, right_after = await self.firmware.get_encoder_ticks() + logger.info(f"Encoder ticks after: L={left_after}, R={right_after}") + + deltas = EncoderDeltas(left=left_after - left_before, right=right_after - right_before) + logger.info(f"Encoder deltas: L={deltas.left}, R={deltas.right}") + + return deltas + + async def _handle_phase_result( + self, + result_tuple: tuple[CalibrationResult, CalibrationState] | None, + phase: CalibrationPhaseType, + ) -> bool: + """ + Handle calibration result: display, confirm, and apply. + + Args: + result_tuple: The computation result or None if computation failed. + phase: The calibration phase type for error messages. + + Returns: + True if result was accepted and applied, False otherwise. + """ + if result_tuple is None: + self.console.show_error(phase.error_message) + return False + + result, new_state = result_tuple + self._display_result(result, self.params) + + accepted = await self.console.confirm("Accept this result?") + self.console.show_info(f"Accepted: {accepted}") + + if accepted: + await self._apply_result(result) + self.state = new_state + return True + + return False + + async def _run_phase_1(self) -> bool: + """Phase 1: Wheel Distance Calibration (Turn in Place).""" + phase = CalibrationPhaseType.WHEEL_DISTANCE + + self.console.show_rule(phase.title) + self.console.show_info(phase.description) + + num_rotations = await self.console.get_integer(phase.input_prompt, default=phase.default_value) + + deltas = await self._execute_motion_sequence(self.firmware.execute_rotations, num_rotations) + if deltas is None: + return False + + result_tuple = compute_wheel_distance_result( + turns=num_rotations, + lticks_delta=deltas.left, + rticks_delta=deltas.right, + encoder_ticks=self.params.encoder_ticks, + left_wheel_radius=self.params.left_wheel_radius, + right_wheel_radius=self.params.right_wheel_radius, + left_polarity=self.params.left_polarity, + right_polarity=self.params.right_polarity, + ) + + return await self._handle_phase_result(result_tuple, phase) + + async def _run_phase_2(self) -> bool: + """Phase 2: Right Wheel Radius Calibration (Square Trajectories).""" + phase = CalibrationPhaseType.RIGHT_WHEEL_RADIUS + + self.console.show_rule(phase.title) + self.console.show_info(phase.description) + + num_squares = await self.console.get_integer(phase.input_prompt, default=phase.default_value) + + deltas = await self._execute_motion_sequence(self.firmware.execute_squares, num_squares) + if deltas is None: + return False + + result_tuple = compute_right_wheel_radius_result( + squares=num_squares, + lticks_delta=deltas.left, + rticks_delta=deltas.right, + state=self.state, + encoder_ticks=self.params.encoder_ticks, + left_wheel_radius=self.params.left_wheel_radius, + left_polarity=self.params.left_polarity, + right_polarity=self.params.right_polarity, + ) + + return await self._handle_phase_result(result_tuple, phase) + + async def _run_phase_3(self) -> bool: + """Phase 3: Left Wheel Radius Calibration (Straight Line).""" + phase = CalibrationPhaseType.LEFT_WHEEL_RADIUS + + self.console.show_rule(phase.title) + self.console.show_info(phase.description) + + distance_mm = await self.console.get_integer(phase.input_prompt, default=phase.default_value) + + deltas = await self._execute_motion_sequence(self.firmware.execute_straight_line, distance_mm) + if deltas is None: + return False + + result_tuple = compute_left_wheel_radius_result( + distance_mm=distance_mm, + lticks_delta=deltas.left, + rticks_delta=deltas.right, + state=self.state, + encoder_ticks=self.params.encoder_ticks, + left_polarity=self.params.left_polarity, + right_polarity=self.params.right_polarity, + ) + + return await self._handle_phase_result(result_tuple, phase) + + async def _run_calibration(self) -> None: + """Run the calibration phases.""" + self._display_intro() + + # Load parameters from firmware + self.console.show_info("Loading odometry parameters from firmware...") + self.params = await self.firmware.load_parameters() + self.initial_params = self.params.model_copy() + self.console.show_success("Parameters loaded successfully") + self._display_parameters(self.params, "Initial Parameters") + + # Phase 1: Wheel distance + while not await self._run_phase_1(): + self.console.show_warning("Retrying Phase 1...") + + # Phase 2: Right wheel radius + while not await self._run_phase_2(): + self.console.show_warning("Retrying Phase 2...") + + # Phase 3: Left wheel radius + while not await self._run_phase_3(): + self.console.show_warning("Retrying Phase 3...") + + # Final summary + self.console.print() + self._display_parameters(self.params, "Final Calibrated Parameters") + + # Save or restore + save_params = await self.console.confirm("Save calibrated parameters to firmware?") + self.console.show_info(f"Save: {save_params}") + if save_params: + self.console.show_info("Saving parameters to firmware...") + await self.firmware.save_parameters(self.params) + self.console.show_success("Parameters saved successfully!") + else: + self.console.show_warning("Restoring initial parameters...") + await self.firmware.save_parameters(self.initial_params) + self.console.show_warning("Initial parameters restored.") + + async def run(self) -> None: + """Main entry point: connect, run calibration, disconnect.""" + try: + await self._connect() + await self._run_calibration() + except KeyboardInterrupt: + self.console.show_warning("\nCalibration interrupted by user") + if self.initial_params and self.firmware: + self.console.show_warning("Restoring initial parameters...") + await self.firmware.save_parameters(self.initial_params) + sys.exit(0) + except Exception as e: + self.console.show_error(str(e)) + logger.error("Unexpected error during calibration") + if self.initial_params and self.firmware: + try: + await self.firmware.save_parameters(self.initial_params) + except Exception: + pass + sys.exit(1) + finally: + await self._disconnect() diff --git a/cogip/tools/firmware_odometry_calibration/odometry_parameters.yaml b/cogip/tools/firmware_odometry_calibration/odometry_parameters.yaml new file mode 100644 index 000000000..0a77419c2 --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/odometry_parameters.yaml @@ -0,0 +1,38 @@ +# Odometry Calibration Parameter Definitions +# +# This file defines the mandatory parameters required by the calibration tool +# and their types to ensure compatibility with firmware parameters. +# +# Note: The 'content' values are left at 0 and are unused. Only the parameter +# names and types matter for this definition file. + +parameters: + - name: qdec_left_polarity + value: + type: float + content: 0.0 + + - name: qdec_right_polarity + value: + type: float + content: 0.0 + + - name: left_wheel_diameter_mm + value: + type: float + content: 0.0 + + - name: right_wheel_diameter_mm + value: + type: float + content: 0.0 + + - name: encoder_wheels_distance_mm + value: + type: float + content: 0.0 + + - name: encoder_wheels_resolution_pulses + value: + type: float + content: 0.0 diff --git a/cogip/tools/firmware_odometry_calibration/sio_events.py b/cogip/tools/firmware_odometry_calibration/sio_events.py new file mode 100644 index 000000000..f8fad52d1 --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/sio_events.py @@ -0,0 +1,60 @@ +import asyncio +from typing import TYPE_CHECKING, Any + +import polling2 +import socketio + +from . import logger + +if TYPE_CHECKING: + from .odometry_calibration import OdometryCalibration + + +class SioEvents(socketio.AsyncClientNamespace): + """ + Handle all SocketIO events received by the odometry calibration tool. + """ + + def __init__(self, calibration: "OdometryCalibration"): + super().__init__("/calibration") + self.calibration = calibration + self.connected = False + + async def on_connect(self): + """ + On connection to cogip-server. + """ + await asyncio.to_thread( + polling2.poll, + lambda: self.client.connected is True, + step=0.2, + poll_forever=True, + ) + logger.info("Connected to cogip-server on /calibration") + await self.emit("connected") + self.connected = True + + async def on_disconnect(self) -> None: + """ + On disconnection from cogip-server. + """ + logger.info("Disconnected from cogip-server") + self.connected = False + + async def on_connect_error(self, data: dict[str, Any]) -> None: + """ + On connection error. + """ + if isinstance(data, dict) and "message" in data: + message = data["message"] + else: + message = data + logger.error(f"Connection to cogip-server failed: {message}") + + async def on_pose_reached(self) -> None: + """ + Handle pose_reached event from copilot (via server). + Signal that the robot has reached its target position. + """ + logger.debug("Pose reached") + self.calibration.pose_reached_event.set() diff --git a/cogip/tools/firmware_odometry_calibration/types.py b/cogip/tools/firmware_odometry_calibration/types.py new file mode 100644 index 000000000..dee2a6192 --- /dev/null +++ b/cogip/tools/firmware_odometry_calibration/types.py @@ -0,0 +1,83 @@ +""" +Odometry Calibration Types + +Enums and constants for the calibration tool. +""" + +import math +from enum import IntEnum, auto + +from cogip.models import Pose + + +class CalibrationPhaseType(IntEnum): + """Calibration phase types.""" + + WHEEL_DISTANCE = auto() + RIGHT_WHEEL_RADIUS = auto() + LEFT_WHEEL_RADIUS = auto() + + @property + def title(self) -> str: + """Phase title for display.""" + titles = { + CalibrationPhaseType.WHEEL_DISTANCE: "WHEEL DISTANCE CALIBRATION", + CalibrationPhaseType.RIGHT_WHEEL_RADIUS: "RIGHT WHEEL RADIUS CALIBRATION", + CalibrationPhaseType.LEFT_WHEEL_RADIUS: "LEFT WHEEL RADIUS CALIBRATION", + } + return titles[self] + + @property + def description(self) -> str: + """Phase description for display.""" + descriptions = { + CalibrationPhaseType.WHEEL_DISTANCE: "The robot will rotate in place to calibrate the wheel distance", + CalibrationPhaseType.RIGHT_WHEEL_RADIUS: ( + "The robot will perform square trajectories to calibrate the right wheel radius" + ), + CalibrationPhaseType.LEFT_WHEEL_RADIUS: ( + "The robot will travel a straight line to calibrate the left wheel radius" + ), + } + return descriptions[self] + + @property + def input_prompt(self) -> str: + """Input prompt for this phase.""" + prompts = { + CalibrationPhaseType.WHEEL_DISTANCE: "How many full rotations?", + CalibrationPhaseType.RIGHT_WHEEL_RADIUS: "How many squares?", + CalibrationPhaseType.LEFT_WHEEL_RADIUS: "What distance in mm?", + } + return prompts[self] + + @property + def default_value(self) -> int: + """Default value for this phase input.""" + defaults = { + CalibrationPhaseType.WHEEL_DISTANCE: 10, + CalibrationPhaseType.RIGHT_WHEEL_RADIUS: 5, + CalibrationPhaseType.LEFT_WHEEL_RADIUS: 2000, + } + return defaults[self] + + @property + def error_message(self) -> str | None: + """Error message specific to this phase computation.""" + errors = { + CalibrationPhaseType.WHEEL_DISTANCE: None, + CalibrationPhaseType.RIGHT_WHEEL_RADIUS: ( + "Right encoder linear component too small. Check encoder connection." + ), + CalibrationPhaseType.LEFT_WHEEL_RADIUS: "Denominator too small. Check encoder counts.", + } + return errors[self] + + +# Predefined square path for calibration phase 2 (500mm sides, counter-clockwise) +SQUARE_PATH_CCW: tuple[Pose, ...] = ( + Pose(x=500, y=0, O=math.pi / 2), + Pose(x=500, y=500, O=math.pi), + Pose(x=0, y=500, O=-math.pi / 2), + Pose(x=0, y=0, O=0), +) diff --git a/cogip/tools/firmware_parameter_manager/__init__.py b/cogip/tools/firmware_parameter_manager/__init__.py index d5fc9f65e..7a1160306 100644 --- a/cogip/tools/firmware_parameter_manager/__init__.py +++ b/cogip/tools/firmware_parameter_manager/__init__.py @@ -1,3 +1,5 @@ +import logging + from cogip.utils.logger import Logger -logger = Logger("cogip-parameter") +logger = Logger("cogip-firmware-parameter", level=logging.WARNING) diff --git a/cogip/tools/firmware_parameter_manager/__main__.py b/cogip/tools/firmware_parameter_manager/__main__.py index 154f3622d..6bbc15775 100644 --- a/cogip/tools/firmware_parameter_manager/__main__.py +++ b/cogip/tools/firmware_parameter_manager/__main__.py @@ -1,16 +1,23 @@ import asyncio from typing import Annotated +import socketio import typer import yaml -from cogip.models.firmware_parameter import FirmwareParametersGroup -from cogip.tools.copilot.pbcom import PBCom +from cogip.models import FirmwareParametersGroup from cogip.tools.firmware_parameter_manager.firmware_parameter_manager import FirmwareParameterManager -async def main_async(pbcom: PBCom, manager: FirmwareParameterManager): - pbcom_task = asyncio.create_task(pbcom.run()) +async def main_async(server_url: str, parameters_group: FirmwareParametersGroup): + """ + CLI utility to test firmware parameter communication. + Creates its own Socket.IO client to host the FirmwareParameterManager. + """ + sio = socketio.AsyncClient(logger=False) + manager = FirmwareParameterManager(sio=sio, parameter_group=parameters_group) + + await sio.connect(server_url, namespaces=[manager.namespace]) print("Firmware parameter values before reading from firmware:") for param in manager.parameter_group: @@ -18,49 +25,39 @@ async def main_async(pbcom: PBCom, manager: FirmwareParameterManager): print() for param in manager.parameter_group: - await manager.get_parameter_value(param.name) + try: + await manager.get_parameter_value(param.name) + except TimeoutError: + print(f" {param.name:.<40} TIMEOUT") print("Firmware parameter values after reading from firmware:") for param in manager.parameter_group: print(f" {param.name:.<40} {param.value}") print() - try: - pbcom_task.cancel() - await pbcom_task - except asyncio.CancelledError: - pass + manager.cleanup() + await sio.disconnect() def main_opt( *, - can_channel: Annotated[ - str, - typer.Option( - "-cc", - "--can-channel", - help="CAN channel connected to STM32 modules", - envvar="PARAMETER_CAN_CHANNEL", - ), - ] = "can0", - can_bitrate: Annotated[ - int, + server_url: Annotated[ + str | None, typer.Option( - "-b", - "--can-bitrate", - help="CAN bitrate", - envvar="PARAMETER_CAN_BITRATE", + help="Socket.IO Server URL", + envvar="COGIP_SOCKETIO_SERVER_URL", ), - ] = 500000, - can_data_bitrate: Annotated[ + ] = None, + robot_id: Annotated[ int, typer.Option( - "-B", - "--data-bitrate", - help="CAN FD data bitrate", - envvar="PARAMETER_CANFD_DATA_BITRATE", + "-i", + "--id", + min=1, + help="Robot ID.", + envvar=["ROBOT_ID"], ), - ] = 1000000, + ] = 1, parameters: Annotated[ typer.FileText, typer.Option( @@ -71,13 +68,13 @@ def main_opt( ), ], ): + if not server_url: + server_url = f"http://localhost:809{robot_id}" + parameters_data = yaml.safe_load(parameters) parameters_group = FirmwareParametersGroup.model_validate(parameters_data["parameters"]) - pbcom = PBCom(can_channel, can_bitrate, can_data_bitrate, message_handlers={}) - manager = FirmwareParameterManager(pbcom, parameters_group) - - asyncio.run(main_async(pbcom, manager)) + asyncio.run(main_async(server_url, parameters_group)) def main(): diff --git a/cogip/tools/firmware_parameter_manager/firmware_parameter_manager.py b/cogip/tools/firmware_parameter_manager/firmware_parameter_manager.py index dabb4bfa2..f3502a874 100644 --- a/cogip/tools/firmware_parameter_manager/firmware_parameter_manager.py +++ b/cogip/tools/firmware_parameter_manager/firmware_parameter_manager.py @@ -1,96 +1,75 @@ import asyncio -from cogip.models.firmware_parameter import FirmwareParametersGroup -from cogip.protobuf import ( - PB_ParameterGetRequest, - PB_ParameterGetResponse, - PB_ParameterSetRequest, - PB_ParameterSetResponse, -) -from cogip.tools.copilot.pbcom import PBCom, pb_exception_handler -from . import logger +import socketio -# Service: 0x3000 - 0x3FFF -parameter_set_uuid: int = 0x3004 -parameter_set_response_uuid: int = 0x3005 -parameter_get_uuid: int = 0x3006 -parameter_get_response_uuid: int = 0x3007 +from cogip.models import FirmwareParametersGroup +from . import logger +from .sio_events import SioEvents class FirmwareParameterManager: """ - Manager to handle firmware parameter requests/responses via PBCom. + Manager to handle firmware parameter requests/responses via SocketIO through copilot. Uses asyncio.Future to correlate requests with their responses. + + This class is designed to be embedded into another Socket.IO client (e.g., Planner, + Monitor, or any tool that needs firmware parameter access). It registers its own + namespace (/parameters) on the provided client, allowing the host tool to manage + the connection lifecycle while benefiting from firmware parameter functionality. + + Note on concurrent usage: The correlation mechanism uses parameter hashes (FNV-1a) + to match responses to requests. This makes the system naturally resilient to multiple + clients making concurrent requests - each client maintains its own pending requests + dictionary and only resolves its own Futures. However, this usage pattern is not + recommended as it may lead to unpredictable firmware behavior and debugging complexity. + Prefer having a single client handle firmware parameters at a time. """ - def __init__(self, pbcom: PBCom, parameter_group: FirmwareParametersGroup): - self.pbcom = pbcom + def __init__( + self, + sio: socketio.AsyncClient, + parameter_group: FirmwareParametersGroup, + ): + """ + Initialize the FirmwareParameterManager. + + Args: + sio: External Socket.IO client on which to register the /parameters namespace. + The host client is responsible for connection management. + parameter_group: The firmware parameters to manage + """ + self.sio = sio self.parameter_group = parameter_group + self.sio_events = SioEvents(self) + self.sio.register_namespace(self.sio_events) + # Dictionary to store Futures waiting for responses # Key: firmware parameter hash (int), Value: asyncio.Future self.pending_get_requests: dict[int, asyncio.Future] = {} self.pending_set_requests: dict[int, asyncio.Future] = {} - # Register message handlers - self.pbcom.register_message_handler(parameter_get_response_uuid, self._on_get_response) - self.pbcom.register_message_handler(parameter_set_response_uuid, self._on_set_response) - - @pb_exception_handler - async def _on_get_response(self, message: bytes | None = None): - """ - Callback called when receiving a get_firmware_parameter response. - """ - if not message: - logger.warning("Received empty message") - return + @property + def namespace(self) -> str: + """Return the namespace path to include when connecting the host client.""" + return self.sio_events.namespace - response = PB_ParameterGetResponse() - response.ParseFromString(message) - - logger.info(f"Received get_response for firmware parameter hash: 0x{response.key_hash:08x}") - - # Retrieve the Future corresponding to this request - if response.key_hash in self.pending_get_requests: - future = self.pending_get_requests.pop(response.key_hash) - if not future.done(): - future.set_result(response) - else: - logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}") - - @pb_exception_handler - async def _on_set_response(self, message: bytes | None = None): - """ - Callback called when receiving a set_firmware_parameter response. - """ - if not message: - logger.warning("Received empty set_response") - return - - response = PB_ParameterSetResponse() - response.ParseFromString(message) - - logger.info(f"Received set_response for firmware parameter hash: 0x{response.key_hash:08x}") - - # Retrieve the Future corresponding to this request - if response.key_hash in self.pending_set_requests: - future = self.pending_set_requests.pop(response.key_hash) - if not future.done(): - future.set_result(response) - else: - logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}") + @property + def is_connected(self) -> bool: + """Check if the namespace is connected and ready.""" + return self.sio_events.connected async def get_parameter_value( self, parameter_name: str, - timeout: float = 0.1, + timeout: float = 1, ) -> float | int | bool: """ Sends a get_firmware_parameter request and waits for the response. Args: parameter_name: Name of the firmware parameter to retrieve - timeout: Timeout in seconds (default: 0.1) + timeout: Timeout in seconds (default: 1) Returns: The firmware parameter value (float, int, or bool) after pydantic validation @@ -105,15 +84,13 @@ async def get_parameter_value( # Create a Future for this request future = asyncio.Future() + # Store pending request for response correalation parameter_hash = hash(parameter) self.pending_get_requests[parameter_hash] = future try: - # Create and send the request - request = PB_ParameterGetRequest() - parameter.pb_copy(request) - - await self.pbcom.send_can_message(parameter_get_uuid, request) + logger.info(f"[firmware_parameter => server] get: {parameter}") + await self.sio.emit("get_parameter_value", parameter.model_dump(by_alias=True), namespace="/parameters") logger.info(f"Sent get_parameter request for hash: 0x{parameter_hash:08x}") @@ -148,7 +125,7 @@ async def set_parameter_value( self, parameter_name: str, value: int | float | bool, - timeout: float = 0.1, + timeout: float = 1, ) -> None: """ Sends a set_firmware_parameter request and waits for the response. @@ -156,7 +133,7 @@ async def set_parameter_value( Args: parameter_name: Name of the firmware parameter to modify value: Value to set (int, float, or bool) - timeout: Timeout in seconds (default: 0.1) + timeout: Timeout in seconds (default: 1) Returns: True if the firmware parameter was successfully written @@ -175,14 +152,13 @@ async def set_parameter_value( # Create a Future for this request future = asyncio.Future() + # Store pending request for response correalation parameter_hash = hash(parameter) self.pending_set_requests[parameter_hash] = future try: - # Create and send the request - request = PB_ParameterSetRequest(key_hash=parameter_hash) - parameter.pb_copy(request) - await self.pbcom.send_can_message(parameter_set_uuid, request) + logger.info(f"[firmware_parameter => server] set: {parameter}") + await self.sio.emit("set_parameter_value", parameter.model_dump(by_alias=True), namespace="/parameters") logger.info(f"Sent set_parameter request for hash: 0x{parameter_hash:08x}") diff --git a/cogip/tools/firmware_parameter_manager/sio_events.py b/cogip/tools/firmware_parameter_manager/sio_events.py new file mode 100644 index 000000000..cb62e5c66 --- /dev/null +++ b/cogip/tools/firmware_parameter_manager/sio_events.py @@ -0,0 +1,93 @@ +import asyncio +from typing import TYPE_CHECKING, Any + +import polling2 +import socketio +from google.protobuf.json_format import ParseDict + +from cogip.protobuf import ( + PB_ParameterGetResponse, + PB_ParameterSetResponse, +) +from . import logger + +if TYPE_CHECKING: + from .firmware_parameter_manager import FirmwareParameterManager + + +class SioEvents(socketio.AsyncClientNamespace): + """ + Handle all SocketIO events received by FirmwareParameterManager. + """ + + def __init__(self, manager: "FirmwareParameterManager"): + super().__init__("/parameters") + self.manager = manager + self.connected = False + + async def on_connect(self): + """ + On connection to cogip-server. + """ + await asyncio.to_thread( + polling2.poll, + lambda: self.client.connected is True, + step=1, + poll_forever=True, + ) + logger.info("Connected to cogip-server") + await self.emit("connected") + + self.connected = True + + async def on_disconnect(self) -> None: + """ + On disconnection from cogip-server. + """ + logger.info("Disconnected from cogip-server") + self.connected = False + + async def on_connect_error(self, data: dict[str, Any]) -> None: + """ + On connection error, check if a firmware parameter manager is already connected and exit, + or retry connection. + """ + if isinstance(data, dict) and "message" in data: + message = data["message"] + else: + message = data + logger.error(f"Connection to cogip-server failed: {message}") + + async def on_get_parameter_response(self, data: dict[str, Any]): + """ + Handle get_parameter_response from copilot. + Resolve the pending Future with the response. + """ + response = ParseDict(data, PB_ParameterGetResponse()) + + logger.info(f"[SIO] Received get_response for firmware parameter hash: 0x{response.key_hash:08x}") + + # Retrieve the Future corresponding to this request + if response.key_hash in self.manager.pending_get_requests: + future = self.manager.pending_get_requests.pop(response.key_hash) + if not future.done(): + future.set_result(response) + else: + logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}") + + async def on_set_parameter_response(self, data: dict[str, Any]): + """ + Handle set_parameter_response from copilot. + Resolve the pending Future with the response. + """ + response = ParseDict(data, PB_ParameterSetResponse()) + + logger.info(f"[SIO] Received set_response for firmware parameter hash: 0x{response.key_hash:08x}") + + # Retrieve the Future corresponding to this request + if response.key_hash in self.manager.pending_set_requests: + future = self.manager.pending_set_requests.pop(response.key_hash) + if not future.done(): + future.set_result(response) + else: + logger.warning(f"No pending request found for firmware parameter hash: 0x{response.key_hash:08x}") diff --git a/cogip/tools/firmware_telemetry/__init__.py b/cogip/tools/firmware_telemetry/__init__.py new file mode 100644 index 000000000..8818dc544 --- /dev/null +++ b/cogip/tools/firmware_telemetry/__init__.py @@ -0,0 +1,3 @@ +from cogip.utils.logger import Logger + +logger = Logger("cogip-firmware-telemetry") diff --git a/cogip/tools/firmware_telemetry/__main__.py b/cogip/tools/firmware_telemetry/__main__.py new file mode 100644 index 000000000..83ac34577 --- /dev/null +++ b/cogip/tools/firmware_telemetry/__main__.py @@ -0,0 +1,99 @@ +import asyncio +import time +from typing import Annotated + +import socketio +import typer + +from cogip.tools.firmware_telemetry.firmware_telemetry_manager import FirmwareTelemetryManager + + +async def main_async(server_url: str, duration: float): + """ + CLI utility to test firmware telemetry reception. + Creates its own Socket.IO client to host the FirmwareTelemetryManager. + """ + sio = socketio.AsyncClient(logger=False) + telemetry = FirmwareTelemetryManager(sio=sio) + + print(f"Connecting to {server_url}...") + await sio.connect(server_url, namespaces=[telemetry.namespace]) + + print("Enabling telemetry...") + await telemetry.enable() + + print(f"Receiving telemetry data for {duration} seconds...") + print("-" * 60) + + start_time = time.monotonic() + last_print_time = 0.0 + + try: + while (time.monotonic() - start_time) < duration: + await asyncio.sleep(0.1) + + # Print telemetry values every second + current_time = time.monotonic() - start_time + if current_time - last_print_time >= 1.0: + last_print_time = current_time + print(f"\n[{current_time:.1f}s] Telemetry store ({len(telemetry.store)} entries):") + for key_hash, data in telemetry.store.items(): + print(f" hash=0x{key_hash:08x} ts={data.timestamp_ms}ms value={data.value}") + + except KeyboardInterrupt: + print("\nInterrupted by user.") + + print("-" * 60) + print("Disabling telemetry...") + await telemetry.disable() + + await sio.disconnect() + print("Disconnected.") + + +def main_opt( + *, + server_url: Annotated[ + str | None, + typer.Option( + help="Socket.IO Server URL", + envvar="COGIP_SOCKETIO_SERVER_URL", + ), + ] = None, + robot_id: Annotated[ + int, + typer.Option( + "-i", + "--id", + min=1, + help="Robot ID.", + envvar=["ROBOT_ID"], + ), + ] = 1, + duration: Annotated[ + float, + typer.Option( + "-d", + "--duration", + help="Duration in seconds to receive telemetry data.", + ), + ] = 10.0, +): + if not server_url: + server_url = f"http://localhost:809{robot_id}" + + asyncio.run(main_async(server_url, duration)) + + +def main(): + """ + Run firmware telemetry utility. + + During installation of cogip-tools, `setuptools` is configured + to create the `cogip-firmware-telemetry` script using this function as entrypoint. + """ + typer.run(main_opt) + + +if __name__ == "__main__": + main() diff --git a/cogip/tools/firmware_telemetry/firmware_telemetry_manager.py b/cogip/tools/firmware_telemetry/firmware_telemetry_manager.py new file mode 100644 index 000000000..4e5f1367f --- /dev/null +++ b/cogip/tools/firmware_telemetry/firmware_telemetry_manager.py @@ -0,0 +1,92 @@ +import socketio + +from cogip.models import TelemetryData, TelemetryDict +from .sio_events import SioEvents + + +class FirmwareTelemetryManager: + """ + Manager to receive firmware telemetry data via SocketIO through copilot. + + This class is designed to be embedded into another Socket.IO client (e.g., Planner, + Monitor, Calibration tool, or any tool that needs firmware telemetry access). It registers + its own namespace (/telemetry) on the provided client, allowing the host tool to manage + the connection lifecycle while benefiting from telemetry data reception. + + Telemetry data is stored in a TelemetryDict and can be accessed by key name or hash. + + Note on concurrent usage: Multiple clients can receive telemetry data simultaneously + as the server broadcasts to all connected clients. However, having multiple consumers + may lead to debugging complexity. Prefer having a single client handle telemetry at a time. + """ + + def __init__( + self, + sio: socketio.AsyncClient, + ): + """ + Initialize the FirmwareTelemetryManager. + + Args: + sio: External Socket.IO client on which to register the /telemetry namespace. + The host client is responsible for connection management. + """ + self.sio = sio + self.data = TelemetryDict() + + self.sio_events = SioEvents(self) + self.sio.register_namespace(self.sio_events) + + @property + def namespace(self) -> str: + """Return the namespace path to include when connecting the host client.""" + return self.sio_events.namespace + + @property + def is_connected(self) -> bool: + """Check if the namespace is connected and ready.""" + return self.sio_events.connected + + async def enable(self) -> None: + """ + Enable telemetry on the firmware. + Sends telemetry_enable event to copilot via the server. + """ + await self.sio.emit("telemetry_enable", namespace="/telemetry") + + async def disable(self) -> None: + """ + Disable telemetry on the firmware. + Sends telemetry_disable event to copilot via the server. + """ + await self.sio.emit("telemetry_disable", namespace="/telemetry") + + def get_value(self, key: str, default: float | int = 0) -> float | int: + """ + Get the latest telemetry value for a key. + + Args: + key: The telemetry key name. + default: Default value if key not found. + + Returns: + The telemetry value, or default if not found. + """ + if key in self.data: + return self.data[key] + return default + + def get_model(self, key: str) -> TelemetryData: + """ + Get the full telemetry data model for a key. + + Args: + key: The telemetry key name. + + Returns: + TelemetryData containing key_hash, timestamp_ms, and value. + + Raises: + KeyError: If the key is not found in the store. + """ + return self.data.get_model(key) diff --git a/cogip/tools/firmware_telemetry/sio_events.py b/cogip/tools/firmware_telemetry/sio_events.py new file mode 100644 index 000000000..1b15f6fd7 --- /dev/null +++ b/cogip/tools/firmware_telemetry/sio_events.py @@ -0,0 +1,65 @@ +import asyncio +from typing import TYPE_CHECKING, Any + +import polling2 +import socketio +from google.protobuf.json_format import ParseDict + +from cogip.models import TelemetryData +from cogip.protobuf import PB_TelemetryData +from . import logger + +if TYPE_CHECKING: + from .firmware_telemetry_manager import FirmwareTelemetryManager + + +class SioEvents(socketio.AsyncClientNamespace): + """ + Handle all SocketIO events received by FirmwareTelemetryManager. + """ + + def __init__(self, manager: "FirmwareTelemetryManager"): + super().__init__("/telemetry") + self.manager = manager + self.connected = False + + async def on_connect(self): + """ + On connection to cogip-server. + """ + await asyncio.to_thread( + polling2.poll, + lambda: self.client.connected is True, + step=1, + poll_forever=True, + ) + logger.info("Connected to cogip-server") + await self.emit("connected") + + self.connected = True + + async def on_disconnect(self) -> None: + """ + On disconnection from cogip-server. + """ + logger.info("Disconnected from cogip-server") + self.connected = False + + async def on_connect_error(self, data: dict[str, Any]) -> None: + """ + On connection error. + """ + if isinstance(data, dict) and "message" in data: + message = data["message"] + else: + message = data + logger.error(f"Connection to cogip-server failed: {message}") + + async def on_telemetry_data(self, data: dict[str, Any]): + """ + Handle telemetry_data from copilot. + Store the telemetry data point. + """ + telemetry = ParseDict(data, PB_TelemetryData()) + telemetry_data = TelemetryData.from_protobuf(telemetry) + self.manager.store.update(telemetry_data) diff --git a/cogip/tools/server/context.py b/cogip/tools/server/context.py index eb29b8803..6c9b53330 100644 --- a/cogip/tools/server/context.py +++ b/cogip/tools/server/context.py @@ -18,6 +18,7 @@ class Context(metaclass=Singleton): robotcam_sid: Robotcam sid beacon_sid: Beacon server sid monitor_sid: Monitor sid + calibration_sid: Calibration client sid tool_menus: all registered tool menus current_tool_menu: name of the currently selected tool menu shell_menu: last received shell menu @@ -34,6 +35,7 @@ class Context(metaclass=Singleton): robotcam_sid: str | None = None beacon_sid: str | None = None monitor_sid: str | None = None + calibration_sid: str | None = None tool_menus: dict[str, models.ShellMenu] = field(default_factory=dict) current_tool_menu: str | None = None shell_menu: models.ShellMenu | None = None diff --git a/cogip/tools/server/namespaces/__init__.py b/cogip/tools/server/namespaces/__init__.py index 360fede0c..5d0e03179 100644 --- a/cogip/tools/server/namespaces/__init__.py +++ b/cogip/tools/server/namespaces/__init__.py @@ -1,7 +1,10 @@ +from .beacon import BeaconNamespace # noqa from .copilot import CopilotNamespace # noqa from .dashboard import DashboardNamespace # noqa from .detector import DetectorNamespace # noqa +from .firmware_calibration import FirmwareCalibrationNamespace # noqa +from .firmware_parameters import FirmwareParametersNamespace # noqa +from .firmware_telemetry import FirmwareTelemetryNamespace # noqa from .monitor import MonitorNamespace # noqa from .planner import PlannerNamespace # noqa from .robotcam import RobotcamNamespace # noqa -from .beacon import BeaconNamespace # noqa diff --git a/cogip/tools/server/namespaces/copilot.py b/cogip/tools/server/namespaces/copilot.py index 9dc943297..c8c0b30ee 100644 --- a/cogip/tools/server/namespaces/copilot.py +++ b/cogip/tools/server/namespaces/copilot.py @@ -56,6 +56,8 @@ async def on_pose_reached(self, sid) -> None: """ logger.info("[copilot => planner] Pose reached.") await self.emit("pose_reached", namespace="/planner") + if self.context.calibration_sid: + await self.emit("pose_reached", namespace="/calibration") async def on_intermediate_pose_reached(self, sid) -> None: """ @@ -109,3 +111,27 @@ async def on_game_end(self, sid) -> None: """ logger.info("[copilot => planner] Game end.") await self.emit("game_end", namespace="/planner") + + async def on_get_parameter_response(self, sid, response: dict[str, Any]) -> None: + """ + Callback on get_parameter_response message from copilot. + Forward to firmware parameter manager. + """ + logger.info(f"[copilot => parameters] Get response: {response}") + await self.emit("get_parameter_response", response, namespace="/parameters") + + async def on_set_parameter_response(self, sid, response: dict[str, Any]) -> None: + """ + Callback on set_parameter_response message from copilot. + Forward to firmware parameter manager. + """ + logger.info(f"[copilot => parameters] Set response: {response}") + await self.emit("set_parameter_response", response, namespace="/parameters") + + async def on_telemetry_data(self, sid, telemetry: dict[str, Any]) -> None: + """ + Callback on telemetry_data message from copilot. + Forward to telemetry clients. + """ + logger.debug(f"[copilot => telemetry] Telemetry Data: {telemetry}") + await self.emit("telemetry_data", telemetry, namespace="/telemetry") diff --git a/cogip/tools/server/namespaces/firmware_calibration.py b/cogip/tools/server/namespaces/firmware_calibration.py new file mode 100644 index 000000000..b1b73694b --- /dev/null +++ b/cogip/tools/server/namespaces/firmware_calibration.py @@ -0,0 +1,49 @@ +from typing import Any + +import socketio + +from .. import logger, server +from ..context import Context + + +class FirmwareCalibrationNamespace(socketio.AsyncNamespace): + """ + Handle all SocketIO events related to firmware calibration clients. + Only one client connection is allowed at a time. + """ + + def __init__(self, cogip_server: "server.Server"): + super().__init__("/calibration") + self.cogip_server = cogip_server + self.context = Context() + + async def on_connect(self, sid, environ): + if self.context.calibration_sid: + message = "A calibration client is already connected" + logger.error(f"Calibration connection refused: {message}") + raise ConnectionRefusedError(message) + + async def on_connected(self, sid): + logger.info("Calibration client connected.") + self.context.calibration_sid = sid + + async def on_disconnect(self, sid): + self.context.calibration_sid = None + logger.info("Calibration client disconnected.") + + async def on_pose_start(self, sid, pose: dict[str, Any]): + """ + Callback on pose start (reset robot position). + Forward to copilot. + """ + logger.info(f"[calibration => copilot] Pose start: {pose}") + await self.emit("pose_start", pose, namespace="/copilot") + + async def on_pose_order(self, sid, pose: dict[str, Any]): + """ + Callback on pose order. + Forward to pose to copilot and dashboards. + """ + logger.info(f"[calibration => copilot] Pose order: {pose}") + await self.emit("pose_order", pose, namespace="/copilot") + await self.emit("pose_order", (self.context.robot_id, pose), namespace="/dashboard") diff --git a/cogip/tools/server/namespaces/firmware_parameters.py b/cogip/tools/server/namespaces/firmware_parameters.py new file mode 100644 index 000000000..cd95f065e --- /dev/null +++ b/cogip/tools/server/namespaces/firmware_parameters.py @@ -0,0 +1,42 @@ +from typing import Any + +import socketio + +from .. import logger, server + + +class FirmwareParametersNamespace(socketio.AsyncNamespace): + """ + Handle all SocketIO events related to firmware parameter clients. + Supports multiple simultaneous client connections. + """ + + def __init__(self, cogip_server: "server.Server"): + super().__init__("/parameters") + self.cogip_server = cogip_server + + async def on_connect(self, sid, environ): + """Allow multiple client connections.""" + pass + + async def on_connected(self, sid): + logger.info(f"Parameter client connected: {sid}") + + async def on_disconnect(self, sid): + logger.info(f"Parameter client disconnected: {sid}") + + async def on_get_parameter_value(self, sid, data: dict[str, Any]): + """ + Callback on get_parameter_value message. + Forward to copilot. + """ + logger.info(f"[parameters => copilot] Get parameter: {data}") + await self.emit("get_parameter_value", data, namespace="/copilot") + + async def on_set_parameter_value(self, sid, data: dict[str, Any]): + """ + Callback on set_parameter_value message. + Forward to copilot. + """ + logger.info(f"[parameters => copilot] Set parameter: {data}") + await self.emit("set_parameter_value", data, namespace="/copilot") diff --git a/cogip/tools/server/namespaces/firmware_telemetry.py b/cogip/tools/server/namespaces/firmware_telemetry.py new file mode 100644 index 000000000..75a4cc8fa --- /dev/null +++ b/cogip/tools/server/namespaces/firmware_telemetry.py @@ -0,0 +1,42 @@ +from typing import Any + +import socketio + +from .. import logger, server + + +class FirmwareTelemetryNamespace(socketio.AsyncNamespace): + """ + Handle all SocketIO events related to firmware telemetry clients. + Supports multiple simultaneous client connections. + """ + + def __init__(self, cogip_server: "server.Server"): + super().__init__("/telemetry") + self.cogip_server = cogip_server + + async def on_connect(self, sid, environ): + """Allow multiple client connections.""" + pass + + async def on_connected(self, sid): + logger.info(f"Telemetry client connected: {sid}") + + async def on_disconnect(self, sid): + logger.info(f"Telemetry client disconnected: {sid}") + + async def on_telemetry_enable(self, sid, data: dict[str, Any] | None = None): + """ + Callback on telemetry_enable message. + Forward to copilot. + """ + logger.info("[telemetry => copilot] Telemetry enable") + await self.emit("telemetry_enable", data or {}, namespace="/copilot") + + async def on_telemetry_disable(self, sid, data: dict[str, Any] | None = None): + """ + Callback on telemetry_disable message. + Forward to copilot. + """ + logger.info("[telemetry => copilot] Telemetry disable") + await self.emit("telemetry_disable", data or {}, namespace="/copilot") diff --git a/cogip/tools/server/server.py b/cogip/tools/server/server.py index 00127d4a4..b7d302db0 100644 --- a/cogip/tools/server/server.py +++ b/cogip/tools/server/server.py @@ -88,6 +88,9 @@ def __init__(self): self.sio.register_namespace(namespaces.PlannerNamespace(self)) self.sio.register_namespace(namespaces.RobotcamNamespace(self)) self.sio.register_namespace(namespaces.BeaconNamespace(self)) + self.sio.register_namespace(namespaces.FirmwareParametersNamespace(self)) + self.sio.register_namespace(namespaces.FirmwareTelemetryNamespace(self)) + self.sio.register_namespace(namespaces.FirmwareCalibrationNamespace(self)) self.dashboard_updater_loop.start() diff --git a/cogip/utils/console_ui.py b/cogip/utils/console_ui.py new file mode 100644 index 000000000..4c60623c9 --- /dev/null +++ b/cogip/utils/console_ui.py @@ -0,0 +1,489 @@ +""" +Generic Console UI components using Rich. + +Provides a themed console interface with consistent styling +for interactive CLI tools. +""" + +from __future__ import annotations +import asyncio +from typing import Any + +from rich.box import ROUNDED +from rich.console import Console +from rich.panel import Panel +from rich.progress import BarColumn, Progress, ProgressColumn, SpinnerColumn, TaskID, TextColumn, TimeElapsedColumn +from rich.prompt import Confirm, FloatPrompt, IntPrompt, Prompt +from rich.rule import Rule +from rich.table import Table +from rich.theme import Theme + + +class CustomProgressTracker: + """ + Progress tracker with manual lifecycle and custom columns/fields. + + Features: + - Manual start/update/stop control + - Custom Rich columns + - Custom task fields that can be updated + + Example: + tracker = console.create_progress_tracker(columns=[ + SpinnerColumn(), + TextColumn("{task.description}"), + BarColumn(), + TextColumn("Status: {task.fields[status]}"), + ]) + tracker.start("Processing", total=100, status="starting") + for i in range(100): + tracker.update(completed=i, status=f"item {i}") + tracker.stop() + """ + + def __init__( + self, + console: Console, + columns: list[ProgressColumn] | None = None, + ): + """ + Initialize the progress tracker. + + Args: + console: Console instance for output + columns: Custom Rich Progress columns. If None, uses default columns. + """ + self._console = console + self._columns = columns or [ + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + ] + self._progress: Progress | None = None + self._task_id: TaskID | None = None + + def start( + self, + description: str, + total: float = 100, + **fields: float, + ) -> None: + """ + Start the progress display. + + Args: + description: Task description + total: Total value for 100% completion (default: 100 for percentage) + **fields: Custom fields to display + """ + if self._progress is not None: + raise RuntimeError("Progress already started. Call stop() first.") + + self._progress = Progress(*self._columns, console=self._console) + self._progress.start() + self._task_id = self._progress.add_task(description, total=total, **fields) + + def update( + self, + *, + completed: float | None = None, + description: str | None = None, + **fields: float, + ) -> None: + """ + Update progress state. + + Args: + completed: Current completion value (0 to total) + description: New description (optional) + **fields: Updated custom field values + """ + if self._progress is None or self._task_id is None: + return + + self._progress.update( + self._task_id, + completed=completed, + description=description, + **fields, + ) + + def stop(self, complete: bool = True) -> None: + """ + Stop the progress display. + + Args: + complete: If True, set progress to 100% before stopping + """ + if self._progress is None: + return + + if complete and self._task_id is not None: + self._progress.update(self._task_id, completed=self._progress.tasks[self._task_id].total) + + self._progress.stop() + self._progress = None + self._task_id = None + + @property + def is_active(self) -> bool: + """Check if progress is currently active.""" + return self._progress is not None + + +DEFAULT_THEME = Theme( + { + # Primary styles - using Rich default colors + "header": "bold cyan", + "phase": "bold bright_white", + "value": "bold bright_cyan", + "label": "white", + # Status styles + "success": "bold green", + "warning": "bold yellow", + "error": "bold red", + "info": "bright_blue", + # Utility + "muted": "dim", + "prompt": "bold yellow", + } +) + + +class ConsoleUI(Console): + """Generic console UI with themed output and async input methods.""" + + def __init__(self, theme: Theme | None = None): + """ + Initialize the console UI. + + Args: + theme: Custom Rich theme. If None, uses DEFAULT_THEME. + """ + super().__init__(theme=theme or DEFAULT_THEME) + + def show_panel( + self, + content: str, + *, + title: str | None = None, + subtitle: str | None = None, + border_style: str = "header", + ) -> None: + """ + Display a styled panel. + + Args: + content: Panel content text + title: Optional panel title + subtitle: Optional panel subtitle + border_style: Border style name from theme + """ + self.print( + Panel( + content, + title=f"[header]{title}[/]" if title else None, + subtitle=f"[muted]{subtitle}[/]" if subtitle else None, + border_style=border_style, + box=ROUNDED, + padding=(1, 2), + ) + ) + + def show_rule(self, title: str, *, style: str = "muted", title_style: str = "phase") -> None: + """ + Display a horizontal rule with title. + + Args: + title: Rule title text + style: Line style name from theme + title_style: Title style name from theme + """ + self.print() + self.print(Rule(f"[{title_style}]{title}[/]", style=style)) + self.print() + + def show_success(self, message: str) -> None: + """Display a success message.""" + self.print(f"[success]\u2713 {message}[/]") + + def show_warning(self, message: str) -> None: + """Display a warning message.""" + self.print(f"[warning]\u26a0 {message}[/]") + + def show_error(self, message: str) -> None: + """Display an error message.""" + self.print(f"[error]\u2717 {message}[/]") + + def show_info(self, message: str) -> None: + """Display an info message.""" + self.print(f"[muted]\u2139 {message}[/]") + + def create_table( + self, + title: str | None = None, + columns: list[tuple[str, dict[str, Any]]] | None = None, + ) -> Table: + """Create a styled table. + + Args: + title: Optional table title + columns: List of (name, kwargs) tuples for columns. + kwargs are passed to add_column(). + + Returns: + Configured Table instance ready for add_row() calls. + """ + table = Table( + title=f"[phase]{title}[/]" if title else None, + box=ROUNDED, + border_style="muted", + header_style="header", + show_header=True, + padding=(0, 1), + ) + if columns: + for name, kwargs in columns: + table.add_column(name, **kwargs) + return table + + def show_key_value_table( + self, + data: list[tuple[str, str]], + title: str | None = None, + key_header: str = "Parameter", + value_header: str = "Value", + ) -> None: + """Display a simple key-value table. + + Args: + data: List of (key, value) tuples + title: Optional table title + key_header: Header for key column + value_header: Header for value column + """ + table = self.create_table( + title=title, + columns=[ + (key_header, {"style": "label"}), + (value_header, {"style": "value", "justify": "right"}), + ], + ) + for key, value in data: + table.add_row(key, value) + self.print(table) + + def show_comparison_table( + self, + data: list[tuple[str, str, str]], + title: str | None = None, + key_header: str = "Parameter", + before_header: str = "Previous", + after_header: str = "New", + ) -> None: + """Display a before/after comparison table. + + Args: + data: List of (key, before_value, after_value) tuples + title: Optional table title + key_header: Header for key column + before_header: Header for before column + after_header: Header for after column + """ + table = self.create_table( + title=title, + columns=[ + (key_header, {"style": "label"}), + (before_header, {"style": "muted"}), + (after_header, {"style": "value"}), + ], + ) + for key, before, after in data: + table.add_row(key, before, after) + self.print(table) + + async def get_string(self, prompt: str, *, default: str | None = None) -> str: + """ + Get string input from user. + + Args: + prompt: Prompt message to display + default: Default value. If None, input is required. + """ + kwargs: dict[str, Any] = {"console": self} + if default is not None: + kwargs["default"] = default + return await asyncio.to_thread(lambda: Prompt.ask(f"[prompt]{prompt}[/]", **kwargs)) + + async def get_integer(self, prompt: str, *, default: int | None = None) -> int: + """ + Get integer input from user. + + Displays a confirmation message with the chosen value. + + Args: + prompt: Prompt message to display + default: Default value. If None, input is required. + """ + kwargs: dict[str, Any] = {"console": self} + if default is not None: + kwargs["default"] = default + value = await asyncio.to_thread(lambda: IntPrompt.ask(f"[prompt]{prompt}[/]", **kwargs)) + self.show_info(f"Chosen: {value}") + return value + + async def get_float(self, prompt: str, *, default: float | None = None) -> float: + """ + Get float input from user. + + Args: + prompt: Prompt message to display + default: Default value. If None, input is required. + """ + kwargs: dict[str, Any] = {"console": self} + if default is not None: + kwargs["default"] = default + return await asyncio.to_thread(lambda: FloatPrompt.ask(f"[prompt]{prompt}[/]", **kwargs)) + + async def confirm(self, message: str, *, default: bool = True) -> bool: + """ + Ask for confirmation. + + Args: + message: Confirmation message to display + default: Default value when user presses Enter + """ + return await asyncio.to_thread(lambda: Confirm.ask(f"[header]{message}[/]", default=default, console=self)) + + async def wait_for_enter(self, message: str) -> None: + """ + Wait for user to press Enter. + + Args: + message: Message to display before waiting + """ + await asyncio.to_thread(lambda: self.input(f"[prompt]{message}[/] [muted][[Enter]][/] ")) + + def create_progress_tracker( + self, + columns: list[ProgressColumn] | None = None, + ) -> CustomProgressTracker: + """ + Create a progress tracker with manual lifecycle control. + + Features: + - Custom display columns + - Custom updatable fields + - Manual start/update/stop control + + Args: + columns: Custom Rich Progress columns. If None, uses default columns + (spinner, description, bar, percentage, elapsed time). + + Returns: + CustomProgressTracker instance (call .start() to begin tracking) + """ + return CustomProgressTracker(self, columns) + + +if __name__ == "__main__": + import time + + async def demo() -> None: + """Demonstrate ConsoleUI features.""" + ui = ConsoleUI() + + # Welcome panel + ui.show_panel( + "This demo showcases all features of the ConsoleUI class.\n" + "Follow the prompts to see each feature in action.", + title="Console UI Demo", + subtitle="Interactive demonstration", + ) + + # Status messages + ui.show_rule("Status Messages") + ui.show_success("Operation completed successfully") + ui.show_warning("This is a warning message") + ui.show_error("This is an error message") + ui.show_info("This is an informational message") + + # Key-value table + ui.show_rule("Key-Value Table") + ui.show_key_value_table( + [ + ("Robot ID", "1"), + ("Wheel diameter", "60.0 mm"), + ("Encoder resolution", "4096 ticks"), + ("Max speed", "1000 mm/s"), + ], + title="Robot Configuration", + ) + + # Comparison table + ui.show_rule("Comparison Table") + ui.show_comparison_table( + [ + ("Left wheel", "59.8 mm", "60.2 mm"), + ("Right wheel", "60.1 mm", "59.9 mm"), + ("Track width", "280.0 mm", "281.5 mm"), + ], + title="Calibration Results", + before_header="Before", + after_header="After", + ) + + # Custom table + ui.show_rule("Custom Table") + table = ui.create_table( + title="Sensor Readings", + columns=[ + ("Sensor", {"style": "label"}), + ("Value", {"style": "value", "justify": "right"}), + ("Status", {"style": "success", "justify": "center"}), + ], + ) + table.add_row("Temperature", "25.3°C", "OK") + table.add_row("Battery", "12.4V", "OK") + table.add_row("Distance", "150 mm", "OK") + ui.print(table) + + # Interactive inputs + ui.show_rule("Interactive Inputs") + + robot_name = await ui.get_string("Robot name", default="PAMI") + ui.show_info(f"Configuring robot: {robot_name}") + + robot_id = await ui.get_integer("Robot ID (1-4)", default=1) + ui.show_info(f"Robot ID set to: {robot_id}") + + wheel_diameter = await ui.get_float("Wheel diameter (mm)", default=60.0) + ui.show_info(f"Wheel diameter: {wheel_diameter:.1f} mm") + + confirmed = await ui.confirm("Do you want to continue?", default=True) + if confirmed: + ui.show_success("Continuing...") + else: + ui.show_warning("Cancelled by user") + + # Progress tracker demo + ui.show_rule("Progress Tracker Demo") + tracker = ui.create_progress_tracker() + tracker.start("[info]Processing items[/]", total=10) + for i in range(10): + time.sleep(0.3) + tracker.update(completed=i + 1) + tracker.stop() + ui.show_success("Progress tracker demo complete") + + # Final panel + await ui.wait_for_enter("Press Enter to exit") + ui.show_panel( + "[success]All features demonstrated successfully![/]", + title="Demo Complete", + border_style="success", + ) + + asyncio.run(demo()) diff --git a/cogip/utils/fnv1a.py b/cogip/utils/fnv1a.py new file mode 100644 index 000000000..c8671f10f --- /dev/null +++ b/cogip/utils/fnv1a.py @@ -0,0 +1,34 @@ +"""FNV-1a hash implementation for firmware key hashing.""" + +from functools import cache + + +@cache +def fnv1a_hash(string: str) -> int: + """ + Compute FNV-1a 32-bit hash of a string. + + This matches the firmware's hash implementation used for parameter + and telemetry keys. Results are cached to avoid recomputing hashes + for previously seen strings. + + Args: + string: The string to hash. + + Returns: + 32-bit FNV-1a hash value. + + Example: + >>> hex(fnv1a_hash("parameter")) + '0x100b' + """ + FNV_OFFSET_BASIS = 0x811C9DC5 + FNV_PRIME = 0x01000193 + + hash_value = FNV_OFFSET_BASIS + + for byte in string.encode("utf-8"): + hash_value ^= byte + hash_value = (hash_value * FNV_PRIME) & 0xFFFFFFFF + + return hash_value diff --git a/pyproject.toml b/pyproject.toml index a25eac7bc..e901c7f20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ dependencies = [ "scikit-learn~=1.6.1", "watchfiles==0.24.0", "websocket-client==1.7.0", + "rich==14.2.0", ] [dependency-groups] @@ -91,6 +92,8 @@ cogip-cpp-shm-example = "cogip.tools.cpp_shm_example.__main__:main" cogip-scservo = "cogip.tools.scservo.__main__:main" cogip-mcu-logger = "cogip.tools.mcu_logger.__main__:main" cogip-firmware-parameter-manager = "cogip.tools.firmware_parameter_manager.__main__:main" +cogip-firmware-telemetry = "cogip.tools.firmware_telemetry.__main__:main" +cogip-odometry-calibration = "cogip.tools.firmware_odometry_calibration.__main__:main" [tool.uv] python-preference = "only-managed"