Skip to content
15 changes: 15 additions & 0 deletions src/flockwave/server/ext/rtk/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class RTKExtension(Extension):
_statistics: RTKStatistics
_survey_settings: RTKSurveySettings
_tx_queue: SendChannel | None = None
_config_fixed_position: Any | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be typed properly but I'll deal with this myself.


def __init__(self):
"""Constructor."""
Expand Down Expand Up @@ -201,6 +202,7 @@ def configure(self, configuration: dict[str, Any]) -> None:
if isinstance(fixed, (list, tuple)):
fixed = {"position": list(fixed)}

self._config_fixed_position = None
if isinstance(fixed, dict) and "position" in fixed:
if "accuracy" not in fixed:
self.log.warning(
Expand All @@ -224,6 +226,8 @@ def configure(self, configuration: dict[str, Any]) -> None:

position = self._survey_settings.position
if position is not None:
# Store the fixed position from config so we can restore it when switching RTK sources.
self._config_fixed_position = position
coord = ECEFToGPSCoordinateTransformation().to_gps(position).format()
self.log.info(
f"Base station is fixed at {coord} (accuracy: {accuracy}m)"
Expand Down Expand Up @@ -370,6 +374,9 @@ def handle_RTK_SOURCE(
else:
preset_id, desired_preset = None, None

# Reset fixed position when switching RTK source, but restore from config if available
self._survey_settings.position = self._config_fixed_position

self._request_preset_switch_later(desired_preset)
self._last_preset_request_from_user = (
RTKPresetRequest(preset_id=preset_id)
Expand Down Expand Up @@ -411,6 +418,10 @@ def handle_RTK_SURVEY(self, message: FlockwaveMessage, sender, hub: MessageHub):
error = "Settings object missing or invalid"

if error is None:
position = self._survey_settings.position
if position is not None:
# Populate antenna.position immediately so RTK-STAT reflects it
self._statistics.set_antenna_position_from_ecef(position)
self._request_survey()

return hub.acknowledge(message, outcome=error is None, reason=error)
Expand Down Expand Up @@ -808,6 +819,10 @@ async def _run_survey(self, preset, connection, *, task_status) -> None:
self._statistics.set_to_fixed_with_accuracy(
accuracy_cm / 100.0
)
if position is not None:
self._statistics.set_antenna_position_from_ecef(
position
)
else:
self.log.error(
f"Failed to configure {preset.title!r}",
Expand Down
18 changes: 15 additions & 3 deletions src/flockwave/server/ext/rtk/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,17 @@ def notify(self, packet: RTCMV3Packet) -> None:

position = getattr(packet, "position", None)
if position is not None:
self.position = _ecef_to_gps.to_gps(position)
self.position_ecef = position
self._antenna_position_timestamp = monotonic()
self.set_from_ecef(position)

def set_from_ecef(self, position: ECEFCoordinate) -> None:
"""Sets the antenna position from an ECEF coordinate.
Computes and stores the corresponding GPS coordinate and refreshes
the internal timestamp used to track the age of the last antenna
position observation.
"""
self.position = _ecef_to_gps.to_gps(position)
self.position_ecef = position
self._antenna_position_timestamp = monotonic()

def _forget_old_antenna_position_if_needed(self) -> None:
"""Clears the position of the antenna we have not received another
Expand Down Expand Up @@ -425,6 +433,10 @@ def set_to_fixed_with_accuracy(self, accuracy: float) -> None:
"""
self._survey_status.set_to_fixed_with_accuracy(accuracy)

def set_antenna_position_from_ecef(self, position: ECEFCoordinate) -> None:
"""Sets the antenna position directly from an ECEF coordinate."""
self._antenna_information.set_from_ecef(position)

@contextmanager
def use(self):
"""Context manager that clears the statistics object upon entering
Expand Down