Skip to content

Commit 315ce0f

Browse files
yudataguyineskhounateinaction
authored
Bit Packing (#302)
Co-authored-by: ineskhou <[email protected]> Co-authored-by: Nate Gay <[email protected]> Co-authored-by: Nate Gay <[email protected]>
1 parent 07c921d commit 315ce0f

File tree

4 files changed

+1528
-45
lines changed

4 files changed

+1528
-45
lines changed

circuitpython-workspaces/flight-software/src/pysquared/beacon.py

Lines changed: 227 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from microcontroller import Processor
2222

23+
from .binary_encoder import BinaryDecoder, BinaryEncoder
2324
from .hardware.radio.packetizer.packet_manager import PacketManager
2425
from .logger import Logger
2526
from .nvm.counter import Counter
@@ -83,15 +84,112 @@ def send(self) -> bool:
8384
Returns:
8485
True if the beacon was sent successfully, False otherwise.
8586
"""
86-
state = self._build_beacon_state()
87-
beacon_data = json.dumps(state, separators=(",", ":")).encode("utf-8")
88-
return self._packet_manager.send(beacon_data)
87+
state = self._build_state()
88+
# Use binary encoding for efficiency
89+
b = self._encode_binary_state(state)
90+
return self._packet_manager.send(b)
8991

90-
def _build_beacon_state(self) -> OrderedDict[str, object]:
91-
"""Builds the beacon state dictionary with system info and sensor data.
92+
def _encode_binary_state(self, state: OrderedDict[str, object]) -> bytes:
93+
"""Encode the state dictionary using binary encoding for efficiency.
94+
95+
Uses explicit encoding based on known beacon data structure for safety and performance.
96+
97+
Args:
98+
state: The state dictionary to encode
9299
93100
Returns:
94-
OrderedDict containing the beacon state data.
101+
Binary encoded data
102+
"""
103+
encoder = BinaryEncoder()
104+
105+
for key, value in state.items():
106+
self._encode_known_value(encoder, key, value)
107+
108+
return encoder.to_bytes()
109+
110+
def _encode_known_value(
111+
self, encoder: BinaryEncoder, key: str, value: object
112+
) -> None:
113+
"""Encode a value based on its actual type.
114+
115+
This method uses direct type checking for cleaner and more reliable encoding
116+
without relying on key name patterns.
117+
118+
Args:
119+
encoder: The binary encoder to add data to
120+
key: The key name for the value
121+
value: The value to encode
122+
"""
123+
if isinstance(value, dict):
124+
self._encode_sensor_dict(encoder, key, value)
125+
elif isinstance(value, (list, tuple)):
126+
if len(value) == 3 and all(isinstance(v, (int, float)) for v in value):
127+
# Handle 3D vectors (acceleration, gyroscope) by splitting into components
128+
for i, v in enumerate(value):
129+
encoder.add_float(f"{key}_{i}", float(v))
130+
else:
131+
# Non-numeric or non-3D arrays as strings
132+
encoder.add_string(key, str(value))
133+
elif isinstance(value, bool):
134+
encoder.add_int(key, int(value))
135+
elif isinstance(value, int):
136+
encoder.add_int(key, value)
137+
elif isinstance(value, float):
138+
encoder.add_float(key, value)
139+
else:
140+
# Fallback for all other types (strings, etc.)
141+
encoder.add_string(key, str(value))
142+
143+
def _safe_float_convert(self, value: object) -> float:
144+
"""Safely convert a value to float with proper type checking.
145+
146+
Args:
147+
value: The value to convert to float
148+
149+
Returns:
150+
Float representation of the value
151+
152+
Raises:
153+
ValueError: If the value cannot be converted to float
154+
"""
155+
if isinstance(value, (int, float)):
156+
return float(value)
157+
elif isinstance(value, str):
158+
return float(value)
159+
else:
160+
raise ValueError(f"Cannot convert {type(value).__name__} to float: {value}")
161+
162+
def _encode_sensor_dict(
163+
self, encoder: BinaryEncoder, key: str, sensor_data: dict
164+
) -> None:
165+
"""Encode sensor data dictionary with known structure.
166+
167+
Handles sensor readings that come from to_dict() methods with known structure
168+
like {timestamp: float, value: float|list}.
169+
170+
Args:
171+
encoder: The binary encoder to add data to
172+
key: The base key name
173+
sensor_data: Dictionary containing sensor readings
174+
"""
175+
for dict_key, dict_value in sensor_data.items():
176+
full_key = f"{key}_{dict_key}"
177+
if dict_key == "timestamp":
178+
encoder.add_float(full_key, self._safe_float_convert(dict_value))
179+
elif isinstance(dict_value, (list, tuple)) and len(dict_value) == 3:
180+
# Handle 3D vectors (acceleration, gyroscope)
181+
for i, v in enumerate(dict_value):
182+
encoder.add_float(f"{full_key}_{i}", self._safe_float_convert(v))
183+
elif isinstance(dict_value, (int, float)):
184+
encoder.add_float(full_key, float(dict_value))
185+
else:
186+
encoder.add_string(full_key, str(dict_value))
187+
188+
def _build_state(self) -> OrderedDict[str, object]:
189+
"""Build the beacon state dictionary from sensors.
190+
191+
Returns:
192+
OrderedDict containing all beacon data
95193
"""
96194
state: OrderedDict[str, object] = OrderedDict()
97195
self._add_system_info(state)
@@ -259,3 +357,126 @@ def _safe_add_sensor_reading(
259357
state[key] = reading_func()
260358
except Exception as e:
261359
self._log.error(error_msg, e, sensor=sensor_name, index=index)
360+
361+
def send_json(self) -> bool:
362+
"""Sends the beacon using JSON encoding (legacy method).
363+
364+
Returns:
365+
True if the beacon was sent successfully, False otherwise.
366+
"""
367+
state = self._build_state()
368+
b = json.dumps(state, separators=(",", ":")).encode("utf-8")
369+
return self._packet_manager.send(b)
370+
371+
@staticmethod
372+
def decode_binary_beacon(data: bytes, key_map: dict | None = None) -> dict:
373+
"""Decode binary beacon data received from another satellite.
374+
375+
Args:
376+
data: Binary encoded beacon data
377+
key_map: Optional key mapping for decoding (hash -> key name)
378+
379+
Returns:
380+
Dictionary containing decoded beacon data
381+
"""
382+
decoder = BinaryDecoder(data, key_map)
383+
return decoder.get_all()
384+
385+
def generate_key_mapping(self) -> dict:
386+
"""Create a key mapping for this beacon's data structure.
387+
388+
This method generates a template beacon packet and returns the key mapping
389+
that can be used to decode binary beacon data with the same structure.
390+
391+
Returns:
392+
Dictionary mapping key hashes to key names
393+
"""
394+
# Create a template state to get the key structure
395+
state = self._build_template_state()
396+
397+
# Encode to get key mapping
398+
encoder = BinaryEncoder()
399+
for key, value in state.items():
400+
if isinstance(value, str):
401+
encoder.add_string(key, value)
402+
elif isinstance(value, float):
403+
encoder.add_float(key, value)
404+
elif isinstance(value, int):
405+
encoder.add_int(key, value)
406+
elif isinstance(value, bool):
407+
encoder.add_int(key, int(value), size=1)
408+
409+
# Generate the binary data to populate key map
410+
encoder.to_bytes()
411+
return encoder.get_key_map()
412+
413+
def _build_template_state(self) -> OrderedDict[str, object]:
414+
"""Build a template state dictionary for key mapping.
415+
416+
Returns:
417+
OrderedDict containing template beacon data with the same structure
418+
"""
419+
state: OrderedDict[str, object] = OrderedDict()
420+
self._add_template_system_info(state)
421+
self._add_template_sensor_data(state)
422+
return state
423+
424+
def _add_template_system_info(self, state: OrderedDict[str, object]) -> None:
425+
"""Add template system information to the state dictionary.
426+
427+
Args:
428+
state: The state dictionary to update
429+
"""
430+
state["name"] = self._name
431+
state["time"] = "template"
432+
state["uptime"] = 0.0
433+
434+
def _add_template_sensor_data(self, state: OrderedDict[str, object]) -> None:
435+
"""Add template sensor data to the state dictionary.
436+
437+
This method adds template data for each sensor type to ensure consistent
438+
structure even when sensors fail during actual data collection.
439+
440+
Args:
441+
state: The state dictionary to update
442+
"""
443+
for index, sensor in enumerate(self._sensors):
444+
self._add_template_for_sensor(state, sensor, index)
445+
446+
def _add_template_for_sensor(
447+
self, state: OrderedDict[str, object], sensor, index: int
448+
) -> None:
449+
"""Add template data for a specific sensor.
450+
451+
Args:
452+
state: The state dictionary to update
453+
sensor: The sensor instance
454+
index: The sensor index
455+
"""
456+
if isinstance(sensor, Processor):
457+
sensor_name = sensor.__class__.__name__
458+
state[f"{sensor_name}_{index}_temperature"] = 0.0
459+
elif isinstance(sensor, Flag):
460+
state[f"{sensor.get_name()}_{index}"] = False
461+
elif isinstance(sensor, Counter):
462+
state[f"{sensor.get_name()}_{index}"] = 0
463+
elif isinstance(sensor, RadioProto):
464+
sensor_name = sensor.__class__.__name__
465+
state[f"{sensor_name}_{index}_modulation"] = "template"
466+
elif isinstance(sensor, IMUProto):
467+
sensor_name = sensor.__class__.__name__
468+
# Add template data for all IMU fields that would be created
469+
state[f"{sensor_name}_{index}_acceleration_timestamp"] = 0.0
470+
state[f"{sensor_name}_{index}_angular_velocity_timestamp"] = 0.0
471+
for i in range(3):
472+
state[f"{sensor_name}_{index}_acceleration_value_{i}"] = 0.0
473+
state[f"{sensor_name}_{index}_angular_velocity_value_{i}"] = 0.0
474+
elif isinstance(sensor, PowerMonitorProto):
475+
sensor_name = sensor.__class__.__name__
476+
state[f"{sensor_name}_{index}_current_avg"] = 0.0
477+
state[f"{sensor_name}_{index}_bus_voltage_avg"] = 0.0
478+
state[f"{sensor_name}_{index}_shunt_voltage_avg"] = 0.0
479+
elif isinstance(sensor, TemperatureSensorProto):
480+
sensor_name = sensor.__class__.__name__
481+
state[f"{sensor_name}_{index}_temperature_timestamp"] = 0.0
482+
state[f"{sensor_name}_{index}_temperature_value"] = 0.0

0 commit comments

Comments
 (0)