Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cogip/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
from .firmware_parameter import ( # noqa
FirmwareParameter,
FirmwareParameterNotFound,
FirmwareParametersGroup,
FirmwareParameterValidationFailed,
)
from .firmware_telemetry import ( # noqa
TelemetryData,
TelemetryDict,
TelemetryValue,
)
from .models import ( # noqa
CameraExtrinsicParameters,
DynObstacle,
Expand All @@ -12,3 +23,9 @@
ShellMenu,
Vertex,
)
from .odometry_calibration import ( # noqa
CalibrationResult,
CalibrationState,
EncoderDeltas,
OdometryParameters,
)
27 changes: 1 addition & 26 deletions cogip/models/firmware_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PB_ParameterSetResponse,
PB_ParameterStatus,
)
from cogip.utils.fnv1a import fnv1a_hash


class FirmwareParameterValidationFailed(Exception):
Expand All @@ -41,32 +42,6 @@ class FirmwareParameterNotFound(Exception):
pass


def fnv1a_hash(string: str) -> int:
"""Compute FNV-1a hash of a string.

Args:
string: The string to hash

Returns:
The 32-bit hash value as an unsigned integer

Example:
>>> hex(fnv1a_hash("parameter"))
'0x100b'
"""
# FNV-1a constants
FNV_OFFSET_BASIS = 0x811C9DC5
FNV_PRIME = 0x01000193

hash_value = FNV_OFFSET_BASIS

for byte in string.encode("utf-8"):
hash_value ^= byte
hash_value = (hash_value * FNV_PRIME) & 0xFFFFFFFF # Keep it 32-bit

return hash_value


class FirmwareParameterBase(BaseModel):
"""Base firmware parameter type"""

Expand Down
132 changes: 132 additions & 0 deletions cogip/models/firmware_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
Firmware Telemetry Models.

This module provides models for parsing generic telemetry data received
from the robot's MCU firmware via Protobuf messages using FNV-1a key hashes.
"""

from pydantic import BaseModel, ConfigDict

from cogip.protobuf import PB_TelemetryData
from cogip.utils.fnv1a import fnv1a_hash

# Discriminated union of all firmware telemetry value types
TelemetryValue = float | int


class TelemetryData(BaseModel):
"""
Generic telemetry data point with key hash, timestamp, and value.

Attributes:
key_hash: FNV-1a hash of the telemetry key.
timestamp_ms: Timestamp in milliseconds when the data was captured.
value: The telemetry value (float or int depending on type).
"""

model_config = ConfigDict(validate_assignment=True)

key_hash: int
timestamp_ms: int
value: TelemetryValue

@classmethod
def from_protobuf(cls, message: PB_TelemetryData) -> "TelemetryData":
"""
Parse a TelemetryData from a PB_TelemetryData protobuf message.

Args:
message: The PB_TelemetryData protobuf message.

Returns:
TelemetryData instance with parsed values.
"""
key_hash = message.key_hash
timestamp_ms = message.timestamp_ms

# Extract the value from the oneof field
which_value = message.WhichOneof("value")
if which_value is None:
value: TelemetryValue = 0
else:
value = getattr(message, which_value)

return cls(key_hash=key_hash, timestamp_ms=timestamp_ms, value=value)


class TelemetryDict:
"""
Dict-like store for telemetry data points indexed by key hash.

This class collects telemetry data and provides access by key name.
"""

def __init__(self):
self._data: dict[int, TelemetryData] = {}

def update(self, data: TelemetryData) -> None:
"""
Update the store with a new telemetry data point.

Args:
data: The telemetry data point to store.
"""
self._data[data.key_hash] = data

def get_model(self, key: str) -> TelemetryData:
"""
Get telemetry data model by key name.

Args:
key: The telemetry key name to look up.

Returns:
TelemetryData for the key.

Raises:
KeyError: If the key is not found in the store.
"""
return self._data[fnv1a_hash(key)]

def __getitem__(self, key: str) -> TelemetryValue:
"""
Get telemetry value by key name. Raises KeyError if not found.

Args:
key: The telemetry key name to look up.

Returns:
The telemetry value for the key.

Raises:
KeyError: If the key is not found in the store.
"""
return self._data[fnv1a_hash(key)].value

def __contains__(self, key: str) -> bool:
"""
Check if a key exists in the store.

Args:
key: The telemetry key name to check.

Returns:
True if the key exists, False otherwise.
"""
return fnv1a_hash(key) in self._data

def __len__(self) -> int:
"""Return the number of telemetry entries."""
return len(self._data)

def __bool__(self) -> bool:
"""Return True if the store contains any data."""
return bool(self._data)

def items(self):
"""Iterate over (key_hash, TelemetryData) tuples."""
return self._data.items()

def values(self):
"""Iterate over TelemetryData values."""
return self._data.values()
49 changes: 49 additions & 0 deletions cogip/models/odometry_calibration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Odometry Calibration Models

Pydantic models for odometry calibration data.
"""

from pydantic import BaseModel


class CalibrationResult(BaseModel):
"""Result of a calibration computation."""

wheels_distance: float
right_wheel_radius: float
left_wheel_radius: float


class CalibrationState(BaseModel):
"""
State container for calibration intermediate values.

Tracks alpha and beta coefficients computed during calibration phases.
"""

alpha_l: float = 0.0
alpha_r: float = 0.0
beta: float = 0.0


class EncoderDeltas(BaseModel):
"""Encoder tick deltas captured during a motion sequence."""

left: int
right: int


class OdometryParameters(BaseModel):
"""
Container for odometry parameters.

Holds all parameters needed for odometry calibration.
"""

wheels_distance: float = 0.0
right_wheel_radius: float = 0.0
left_wheel_radius: float = 0.0
left_polarity: float = 0.0
right_polarity: float = 0.0
encoder_ticks: float = 0.0
47 changes: 29 additions & 18 deletions cogip/protobuf/PB_ParameterCommands_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 29 additions & 26 deletions cogip/protobuf/PB_ParameterCommands_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,60 @@ from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor
NOT_FOUND: PB_ParameterStatus

class PB_ParameterStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
SUCCESS: _ClassVar[PB_ParameterStatus]
VALIDATION_FAILED: _ClassVar[PB_ParameterStatus]
NOT_FOUND: _ClassVar[PB_ParameterStatus]
SUCCESS: PB_ParameterStatus
VALIDATION_FAILED: PB_ParameterStatus
NOT_FOUND: PB_ParameterStatus

class PB_ParameterValue(_message.Message):
__slots__ = ("float_value", "double_value", "int32_value", "uint32_value", "int64_value", "uint64_value", "bool_value")
FLOAT_VALUE_FIELD_NUMBER: _ClassVar[int]
DOUBLE_VALUE_FIELD_NUMBER: _ClassVar[int]
INT32_VALUE_FIELD_NUMBER: _ClassVar[int]
UINT32_VALUE_FIELD_NUMBER: _ClassVar[int]
INT64_VALUE_FIELD_NUMBER: _ClassVar[int]
UINT64_VALUE_FIELD_NUMBER: _ClassVar[int]
BOOL_VALUE_FIELD_NUMBER: _ClassVar[int]
float_value: float
double_value: float
int32_value: int
uint32_value: int
int64_value: int
uint64_value: int
bool_value: bool
def __init__(self, float_value: _Optional[float] = ..., double_value: _Optional[float] = ..., int32_value: _Optional[int] = ..., uint32_value: _Optional[int] = ..., int64_value: _Optional[int] = ..., uint64_value: _Optional[int] = ..., bool_value: bool = ...) -> None: ...

class PB_ParameterGetRequest(_message.Message):
__slots__ = ["key_hash"]
__slots__ = ("key_hash",)
KEY_HASH_FIELD_NUMBER: _ClassVar[int]
key_hash: int
def __init__(self, key_hash: _Optional[int] = ...) -> None: ...

class PB_ParameterGetResponse(_message.Message):
__slots__ = ["key_hash", "value"]
__slots__ = ("key_hash", "value")
KEY_HASH_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key_hash: int
value: PB_ParameterValue
def __init__(self, key_hash: _Optional[int] = ..., value: _Optional[_Union[PB_ParameterValue, _Mapping]] = ...) -> None: ...

class PB_ParameterSetRequest(_message.Message):
__slots__ = ["key_hash", "value"]
__slots__ = ("key_hash", "value")
KEY_HASH_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key_hash: int
value: PB_ParameterValue
def __init__(self, key_hash: _Optional[int] = ..., value: _Optional[_Union[PB_ParameterValue, _Mapping]] = ...) -> None: ...

class PB_ParameterSetResponse(_message.Message):
__slots__ = ["key_hash", "status"]
__slots__ = ("key_hash", "status")
KEY_HASH_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
key_hash: int
status: PB_ParameterStatus
def __init__(self, key_hash: _Optional[int] = ..., status: _Optional[_Union[PB_ParameterStatus, str]] = ...) -> None: ...

class PB_ParameterValue(_message.Message):
__slots__ = ["bool_value", "double_value", "float_value", "int32_value", "int64_value", "uint32_value", "uint64_value"]
BOOL_VALUE_FIELD_NUMBER: _ClassVar[int]
DOUBLE_VALUE_FIELD_NUMBER: _ClassVar[int]
FLOAT_VALUE_FIELD_NUMBER: _ClassVar[int]
INT32_VALUE_FIELD_NUMBER: _ClassVar[int]
INT64_VALUE_FIELD_NUMBER: _ClassVar[int]
UINT32_VALUE_FIELD_NUMBER: _ClassVar[int]
UINT64_VALUE_FIELD_NUMBER: _ClassVar[int]
bool_value: bool
double_value: float
float_value: float
int32_value: int
int64_value: int
uint32_value: int
uint64_value: int
def __init__(self, float_value: _Optional[float] = ..., double_value: _Optional[float] = ..., int32_value: _Optional[int] = ..., uint32_value: _Optional[int] = ..., int64_value: _Optional[int] = ..., uint64_value: _Optional[int] = ..., bool_value: bool = ...) -> None: ...

class PB_ParameterStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
Loading