TASK-0024A: multi-sensor contract and ingestion (no fusion)#73
TASK-0024A: multi-sensor contract and ingestion (no fusion)#73sonra44 wants to merge 2 commits intotask-0023-policy-v3from
Conversation
Reviewer's GuideIntroduce a multi-sensor radar ingestion layer with explicit Observation/SourceTrack contracts, integrate it into RadarPipeline via new ingestion and render entrypoints while keeping the legacy render_scene path intact, and add tests plus documentation around validation and EventStore emission behavior. Sequence diagram for RadarPipeline.ingest_observations flowsequenceDiagram
actor Client
participant RadarPipeline
participant Ingestion as ingest_observations
participant EventStore
Client->>RadarPipeline: ingest_observations(observations)
RadarPipeline->>Ingestion: ingest_observations(observations, event_store, emit_observation_rx=True)
loop for each Observation
Ingestion->>Ingestion: validate_observation(observation)
alt observation invalid
Ingestion->>EventStore: append_new(SENSOR_OBSERVATION_DROPPED, payload, truth_state=INVALID, reason)
else observation valid
Ingestion->>Ingestion: observation_to_source_track(observation)
Ingestion->>EventStore: append_new(SENSOR_OBSERVATION_RX, payload, truth_state=OK, reason=OBSERVATION_RX)
Ingestion->>EventStore: append_new(SOURCE_TRACK_UPDATED, payload, truth_state=OK, reason=TRACK_UPDATED)
end
end
Ingestion-->>RadarPipeline: tracks_by_source dict~str, list~SourceTrack~~
RadarPipeline-->>Client: tracks_by_source copy
Sequence diagram for RadarPipeline.render_observations end-to-end renderingsequenceDiagram
actor Client
participant RadarPipeline
participant Ingestion as ingest_observations
participant Builder as source_tracks_to_scene
participant EventStore
participant Renderer as render_scene
Client->>RadarPipeline: render_observations(observations, view_state, truth_state, reason, is_fallback)
RadarPipeline->>RadarPipeline: ingest_observations(observations)
RadarPipeline->>Ingestion: ingest_observations(observations, event_store, emit_observation_rx=True)
loop for each Observation
Ingestion->>Ingestion: validate_observation(observation)
alt invalid
Ingestion->>EventStore: append_new(SENSOR_OBSERVATION_DROPPED,...)
else valid
Ingestion->>Ingestion: observation_to_source_track(observation)
Ingestion->>EventStore: append_new(SENSOR_OBSERVATION_RX,...)
Ingestion->>EventStore: append_new(SOURCE_TRACK_UPDATED,...)
end
end
Ingestion-->>RadarPipeline: tracks_by_source dict~str, list~SourceTrack~~
RadarPipeline->>Builder: source_tracks_to_scene(tracks_by_source, truth_state, reason, is_fallback)
Builder-->>RadarPipeline: RadarScene
RadarPipeline->>Renderer: render_scene(scene, view_state)
Renderer-->>RadarPipeline: RenderOutput
RadarPipeline-->>Client: RenderOutput
Class diagram for new Observation and SourceTrack ingestion contractsclassDiagram
class Observation {
+str source_id
+float t
+str track_key
+tuple~float, float~ pos_xy
+tuple~float, float~ vel_xy
+float quality
+float err_radius
+tuple~float, float, float, float~ covariance
+dict~str, Any~ metadata
}
class SourceTrack {
+str source_id
+str source_track_id
+float last_update_t
+tuple~float, float~ state_pos_xy
+tuple~float, float~ state_vel_xy
+float quality
+float trust
+float err_radius
+tuple~float, float, float, float~ covariance
+dict~str, Any~ metadata
}
class RadarPipeline {
-dict~str, list~SourceTrack~~ _tracks_by_source
+ingest_observations(observations list~Observation~) dict~str, list~SourceTrack~~
+render_observations(observations list~Observation~, view_state RadarViewState, truth_state str, reason str, is_fallback bool) RenderOutput
+render_scene(scene RadarScene, view_state RadarViewState) RenderOutput
}
class EventStore {
+append_new(subsystem str, event_type str, payload dict~str, Any~, truth_state TruthState, reason str) None
}
class RadarScene {
+bool ok
+str reason
+str truth_state
+bool is_fallback
+list~RadarPoint~ points
}
class RadarPoint {
+float x
+float y
+float z
+float vr_mps
+dict~str, Any~ metadata
}
class TruthState {
}
RadarPipeline ..> Observation : ingests
RadarPipeline ..> SourceTrack : produces
RadarPipeline ..> RadarScene : renders
RadarPipeline ..> EventStore : emits_events
Observation --> SourceTrack : normalized_by
SourceTrack ..> RadarPoint : mapped_to
RadarScene o--> RadarPoint : contains
EventStore ..> TruthState : uses
Flow diagram for observation validation and event emissionflowchart TD
A[Start ingestion of Observation] --> B[validate_observation]
B -->|invalid| C[Emit SENSOR_OBSERVATION_DROPPED to EventStore]
C --> D[Skip observation]
D --> H[Next observation or finish]
B -->|valid| E[observation_to_source_track]
E --> F[Store or update SourceTrack in tracks_by_source]
F --> G[Emit SENSOR_OBSERVATION_RX and SOURCE_TRACK_UPDATED to EventStore]
G --> H[Next observation or finish]
H --> I[Build tracks_by_source result]
I --> J[Return dict~source_id, list~SourceTrack~~ to caller]
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughОбзорДобавлены три основных компонента для радара: система политик версии 3 на основе YAML с адаптивной загрузкой, многосенсорный контракт приёма наблюдений и интеграция в конвейер обработки с поддержкой управления политик во время работы и обработкой событий. Изменения
Диаграммы последовательностиsequenceDiagram
actor User
participant MCT as Mission<br/>Control<br/>Terminal
participant Ingestion as Ingestion<br/>Pipeline
participant Pipeline as RadarPipeline
participant PolicyLdr as Policy<br/>Loader
participant EventSt as EventStore
User->>MCT: handle_command("policy set combat")
MCT->>Pipeline: set_policy_profile("combat")
Pipeline->>PolicyLdr: load_effective_render_policy_result(profile="combat")
PolicyLdr->>EventSt: POLICY_PROFILE_CHANGED
Pipeline->>EventSt: emit event
MCT->>User: output "policy: profile=combat"
User->>Ingestion: provide List[Observation]
Ingestion->>Ingestion: validate_observation(obs)
alt Invalid observation
Ingestion->>EventSt: SENSOR_OBSERVATION_DROPPED
else Valid
Ingestion->>Ingestion: observation_to_source_track(obs)
Ingestion->>EventSt: SENSOR_OBSERVATION_RX
Ingestion->>EventSt: SOURCE_TRACK_UPDATED
end
Ingestion->>Pipeline: return Dict[source_id, List[SourceTrack]]
Pipeline->>Pipeline: source_tracks_to_scene(tracks_by_source)
Pipeline->>Pipeline: build_render_plan(scene, effective_policy)
Pipeline->>Pipeline: _update_adaptive_state(frame_time_ms, targets_count)
Pipeline->>User: return RenderOutput with telemetry
sequenceDiagram
participant YamlFile as policy_v3.yaml
participant PolicyLdr as Policy<br/>Loader
participant Env as Environment<br/>Variables
participant Pipeline as RadarPipeline
Pipeline->>PolicyLdr: load_effective_render_policy_result(profile, env, yaml_path, strict)
PolicyLdr->>YamlFile: load_policy_yaml(path)
YamlFile-->>PolicyLdr: yaml_doc
PolicyLdr->>PolicyLdr: validate_policy_schema(doc)
alt Validation fails & strict=True
PolicyLdr-->>Pipeline: raise RuntimeError
else Validation fails & strict=False
PolicyLdr->>Env: _env_policy_overrides()
Env-->>PolicyLdr: env_dict
PolicyLdr->>PolicyLdr: fallback with env/default
PolicyLdr-->>Pipeline: RadarPolicyLoadResult(warning=reason)
else Valid
PolicyLdr->>PolicyLdr: build_effective_policy(profile, env, yaml_doc)
PolicyLdr->>PolicyLdr: merge defaults + profile + env overrides
PolicyLdr-->>Pipeline: RadarPolicyLoadResult(render_policy, adaptive_policy, source="yaml")
end
Pipeline->>Pipeline: store render_policy, adaptive_policy, policy_source
Примерная трудозатратность code review🎯 4 (Сложно) | ⏱️ ~50 минут Стихотворение
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- In
RadarPipeline.ingest_observations, you both store the result inself._tracks_by_sourceand then rebuild a new{source_id: list(tracks)}dict to return, which is redundant; consider returningself._tracks_by_sourcedirectly or avoiding the instance field if you only need a per-call result. - In
radar_ingestion.ingest_observations,SOURCE_TRACK_UPDATEDis always emitted regardless ofemit_observation_rx; if the intent is to control all ingestion-side emissions, consider adding a separate flag or reusing the same flag to make this behavior configurable.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `RadarPipeline.ingest_observations`, you both store the result in `self._tracks_by_source` and then rebuild a new `{source_id: list(tracks)}` dict to return, which is redundant; consider returning `self._tracks_by_source` directly or avoiding the instance field if you only need a per-call result.
- In `radar_ingestion.ingest_observations`, `SOURCE_TRACK_UPDATED` is always emitted regardless of `emit_observation_rx`; if the intent is to control all ingestion-side emissions, consider adding a separate flag or reusing the same flag to make this behavior configurable.
## Individual Comments
### Comment 1
<location> `src/qiki/services/q_core_agent/core/radar_ingestion.py:40-41` </location>
<code_context>
+ metadata: dict[str, Any] = field(default_factory=dict)
+
+
+def _is_finite(value: float) -> bool:
+ return math.isfinite(float(value))
+
+
</code_context>
<issue_to_address>
**issue:** Make `_is_finite` resilient to non-numeric inputs to avoid unexpected exceptions during validation.
`_is_finite` currently does `float(value)` directly, so non-numeric inputs (e.g., `None`, strings, arbitrary objects) will raise and cause `validate_observation` to error instead of returning an `INVALID_*` result. Consider wrapping the conversion in a try/except and returning `False` on failure so such cases are handled as invalid observations rather than crashing ingestion.
</issue_to_address>
### Comment 2
<location> `src/qiki/services/q_core_agent/tests/test_radar_multisensor_ingestion.py:33` </location>
<code_context>
+ assert dropped[-1].reason == "MISSING_SOURCE_ID"
+
+
+def test_multisource_ingestion_keeps_independent_source_tracks() -> None:
+ pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
+ tracks_by_source = pipeline.ingest_observations(
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for track deduplication when multiple observations share the same source_id/track_key.
The multisource test covers per-source bucketing and independence of shared `track_key`s across sources, but not repeated observations for the same `(source_id, track_key)`. Please add a test that ingests two observations with the same `source_id`/`track_key` and different timestamps/positions, and asserts that: (1) only one `SourceTrack` exists for that pair, and (2) it reflects the last observation. This will capture the intended dedup/last-write-wins behavior in `ingest_observations`.
Suggested implementation:
```python
)
]
def test_deduplicates_observations_with_same_source_and_track_key() -> None:
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False)
)
observations = [
Observation(
source_id="sensor-1",
t=1.0,
track_key="trk-1",
pos_xy=(0.0, 0.0),
vel_xy=None,
quality=0.9,
),
Observation(
source_id="sensor-1",
t=2.0,
track_key="trk-1",
pos_xy=(10.0, 5.0),
vel_xy=None,
quality=0.7,
),
]
tracks_by_source = pipeline.ingest_observations(observations)
# (1) Only one SourceTrack for this (source_id, track_key) pair
assert set(tracks_by_source.keys()) == {"sensor-1"}
tracks_for_source = tracks_by_source["sensor-1"]
assert set(tracks_for_source.keys()) == {"trk-1"}
track = tracks_for_source["trk-1"]
# (2) Track reflects the *last* observation
assert track.t == 2.0
assert track.pos_xy == (10.0, 5.0)
```
If the `SourceTrack` API does not expose `t`/`pos_xy` directly (for example, if it keeps a `last_observation` field instead), you should adjust the last two assertions to match the actual API, e.g.:
- `assert track.last_observation.t == 2.0`
- `assert track.last_observation.pos_xy == (10.0, 5.0)`
The rest of the test structure (building the pipeline, ingesting two observations, asserting on the per-source/per-track deduplication) should remain the same.
</issue_to_address>
### Comment 3
<location> `src/qiki/services/q_core_agent/tests/test_radar_multisensor_ingestion.py:62` </location>
<code_context>
+ assert tracks_by_source["radar-b"][0].source_track_id == "trk-1"
+
+
+def test_single_source_render_scene_path_remains_compatible() -> None:
+ legacy_pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
+ ingest_pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
</code_context>
<issue_to_address>
**suggestion (testing):** Consider explicitly testing the scene-level contract of `source_tracks_to_scene`, including multi-source target_id and empty-input behavior.
This test verifies that legacy `render_scene` and new `render_observations` produce consistent planning stats, but it doesn’t directly exercise the scene-construction contract. Please add focused tests for `source_tracks_to_scene`, e.g.:
- Multi-source: assert `target_id == "{source_id}:{track_id}"`, `metadata` contains `source_id`, `source_track_id`, `quality`, `trust`, `track_index`, and `vr_mps` is derived from velocity.
- Single-source: assert `target_id == track_id`.
- Empty `tracks_by_source`: assert the returned `RadarScene` has `ok is False`, `truth_state == "NO_DATA"`, `reason == "NO_DATA"`, and preserves `is_fallback`.
These will validate the ingestion → scene contract directly, rather than only via render output.
Suggested implementation:
```python
def test_source_tracks_to_scene_multi_source_contract() -> None:
"""
Multi-source scene contract:
- target_id == "{source_id}:{track_id}"
- metadata contains source_id, source_track_id, quality, trust, track_index
- vr_mps is derived from velocity
"""
# NOTE: adjust construction to match your actual source-track type / helper
tracks_by_source = {
"radar-a": [
make_source_track( # type: ignore[name-defined]
source_id="radar-a",
track_id="trk-1",
x=10.0,
y=0.0,
vx=3.0,
vy=4.0,
quality=0.9,
trust=0.8,
track_index=0,
)
],
"radar-b": [
make_source_track( # type: ignore[name-defined]
source_id="radar-b",
track_id="trk-1",
x=15.0,
y=5.0,
vx=0.0,
vy=5.0,
quality=0.7,
trust=0.6,
track_index=0,
)
],
}
scene = source_tracks_to_scene( # type: ignore[name-defined]
tracks_by_source=tracks_by_source,
is_fallback=False,
)
assert scene.ok is True
assert scene.truth_state == "OK"
assert scene.reason == "OK"
assert scene.is_fallback is False
# We expect one rendered point per source-track
assert len(scene.points) == 2
# Index by target_id for easier assertions
points_by_target_id = {p.target_id: p for p in scene.points} # type: ignore[attr-defined]
point_a = points_by_target_id["radar-a:trk-1"]
point_b = points_by_target_id["radar-b:trk-1"]
# target_id encoding
assert point_a.target_id == "radar-a:trk-1" # type: ignore[attr-defined]
assert point_b.target_id == "radar-b:trk-1" # type: ignore[attr-defined]
# metadata contract
for point, source_id, quality, trust, track_index in [
(point_a, "radar-a", 0.9, 0.8, 0),
(point_b, "radar-b", 0.7, 0.6, 0),
]:
md = point.metadata # type: ignore[attr-defined]
assert md["source_id"] == source_id
assert md["source_track_id"] == "trk-1"
assert md["quality"] == quality
assert md["trust"] == trust
assert md["track_index"] == track_index
# vr_mps derived from velocity magnitude: sqrt(vx^2 + vy^2)
# radar-a: vx=3, vy=4 -> 5 m/s
assert point_a.vr_mps == pytest.approx(5.0, rel=1e-3) # type: ignore[attr-defined]
# radar-b: vx=0, vy=5 -> 5 m/s
assert point_b.vr_mps == pytest.approx(5.0, rel=1e-3) # type: ignore[attr-defined]
def test_source_tracks_to_scene_single_source_contract() -> None:
"""
Single-source scene contract:
- target_id == track_id (no "{source_id}:" prefix)
"""
# NOTE: adjust construction to match your actual source-track type / helper
tracks_by_source = {
"radar-a": [
make_source_track( # type: ignore[name-defined]
source_id="radar-a",
track_id="trk-1",
x=10.0,
y=0.0,
vx=1.0,
vy=0.0,
quality=1.0,
trust=1.0,
track_index=0,
)
]
}
scene = source_tracks_to_scene( # type: ignore[name-defined]
tracks_by_source=tracks_by_source,
is_fallback=False,
)
assert scene.ok is True
assert scene.truth_state == "OK"
assert scene.reason == "OK"
assert scene.is_fallback is False
# Single source: we expect target_id == track_id
assert len(scene.points) == 1
point = scene.points[0]
assert point.target_id == "trk-1" # type: ignore[attr-defined]
md = point.metadata # type: ignore[attr-defined]
assert md["source_id"] == "radar-a"
assert md["source_track_id"] == "trk-1"
assert md["track_index"] == 0
def test_source_tracks_to_scene_empty_tracks_by_source() -> None:
"""
Empty-input scene contract:
- ok is False
- truth_state == "NO_DATA"
- reason == "NO_DATA"
- is_fallback is preserved
"""
scene = source_tracks_to_scene( # type: ignore[name-defined]
tracks_by_source={},
is_fallback=True,
)
assert scene.ok is False
assert scene.truth_state == "NO_DATA"
assert scene.reason == "NO_DATA"
assert scene.is_fallback is True
def test_single_source_render_scene_path_remains_compatible() -> None:
legacy_pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
ingest_pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
legacy_scene = RadarScene(
ok=True,
reason="OK",
truth_state="OK",
is_fallback=False,
points=[
RadarPoint(
x=12.0,
y=3.0,
z=0.0,
```
1. Ensure `pytest` is imported at the top of the file (e.g. `import pytest`) if it is not already.
2. Replace the placeholder `make_source_track(...)` helper calls with the actual way you construct per-source track objects in this test module. For example, you might:
- Use an existing factory/fixture, or
- Directly instantiate your track type, e.g. `RadarSourceTrack(...)` or similar.
3. Update the `source_tracks_to_scene(...)` calls to match the real function signature (e.g. additional parameters such as timestamp, ego pose, or config). The tests are written to focus on the scene contract; wire in any required extra arguments.
4. If your `RadarPoint` objects expose different attribute names for `target_id`, `metadata`, or `vr_mps`, adjust the assertions to match (e.g. `point.id` instead of `point.target_id`, or `point.info` instead of `point.metadata`).
5. If the exact `truth_state` / `reason` strings differ slightly (for example, constants or enums instead of raw `"OK"` / `"NO_DATA"`), replace the string literals with the appropriate values or enum references used elsewhere in the tests.
</issue_to_address>
### Comment 4
<location> `src/qiki/services/q_core_agent/tests/test_radar_multisensor_ingestion.py:99` </location>
<code_context>
+ assert legacy_output.plan.stats.lod_level == ingest_output.plan.stats.lod_level
+
+
+def test_source_track_updated_event_contains_contract_fields() -> None:
+ store = EventStore(maxlen=50, enabled=True)
+ pipeline = RadarPipeline(
</code_context>
<issue_to_address>
**suggestion (testing):** Extend event-related tests to cover `SENSOR_OBSERVATION_RX` and the `emit_observation_rx` flag.
Since `ingest_observations` can emit both `SENSOR_OBSERVATION_RX` and `SOURCE_TRACK_UPDATED`, and `emit_observation_rx` gates the former, it would be valuable to:
- Add a test that validates a `SENSOR_OBSERVATION_RX` event is emitted for a valid observation (including checking key payload fields/values).
- Add a test where `emit_observation_rx=False`, asserting that `SOURCE_TRACK_UPDATED` is still emitted but `SENSOR_OBSERVATION_RX` is not.
This will better lock in the event emission contract and flag behavior for consumers of the event stream.
Suggested implementation:
```python
assert legacy_output.plan.stats.targets_count == ingest_output.plan.stats.targets_count
assert legacy_output.plan.stats.lod_level == ingest_output.plan.stats.lod_level
def test_sensor_observation_rx_event_emitted_for_valid_observation() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
# Explicitly enable emission of SENSOR_OBSERVATION_RX events
emit_observation_rx=True,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="src-1",
t=123.0,
track_key="trk-1",
pos_xy=(1.0, 2.0),
# Include any other required fields for a valid observation here
# (e.g. vel_xy, quality, heading, radar_id, etc.)
)
]
)
# Ensure the ingest call produced tracks (sanity check; adjust if not needed)
assert tracks_by_source
# Filter events for SENSOR_OBSERVATION_RX
sensor_observation_rx_events = [
event
for event in store.events
if getattr(event, "type", getattr(event, "event_type", None)) == SENSOR_OBSERVATION_RX
]
# One or more SENSOR_OBSERVATION_RX events must be emitted for a valid observation
assert sensor_observation_rx_events, "Expected at least one SENSOR_OBSERVATION_RX event"
# Validate key contract fields in the first event payload
event = sensor_observation_rx_events[0]
payload = getattr(event, "payload", getattr(event, "body", None))
# These assertions should mirror the contract of SENSOR_OBSERVATION_RX events.
# Adjust field paths if your event model differs.
assert payload is not None
assert payload["source_id"] == "src-1"
assert payload["track_key"] == "trk-1"
assert payload["t"] == 123.0
assert tuple(payload["pos_xy"]) == (1.0, 2.0)
def test_source_track_updated_emitted_when_emit_observation_rx_disabled() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
# Explicitly disable emission of SENSOR_OBSERVATION_RX events
emit_observation_rx=False,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="src-1",
t=234.0,
track_key="trk-2",
pos_xy=(3.0, 4.0),
# Include any other required fields for a valid observation here
)
]
)
# Ensure the ingest call produced tracks (sanity check; adjust if not needed)
assert tracks_by_source
# Collect event types for easier assertions
event_types = [
getattr(event, "type", getattr(event, "event_type", None)) for event in store.events
]
# SOURCE_TRACK_UPDATED must still be emitted
assert SOURCE_TRACK_UPDATED in event_types
# SENSOR_OBSERVATION_RX must not be emitted when emit_observation_rx=False
assert SENSOR_OBSERVATION_RX not in event_types
```
To fully integrate these tests you will likely need to:
1. **Imports**
- Ensure `SENSOR_OBSERVATION_RX` and `SOURCE_TRACK_UPDATED` are imported at the top of the file from the module where your event types are defined. For example (adjust the module path to match your project):
```python
from qiki.events import SENSOR_OBSERVATION_RX, SOURCE_TRACK_UPDATED
```
- If `EventStore`, `RadarPipeline`, `RadarRenderConfig`, and `Observation` are not already imported in this file, make sure they are imported consistently with the rest of the tests.
2. **Event model access**
- The code currently uses:
```python
store.events
getattr(event, "type", getattr(event, "event_type", None))
getattr(event, "payload", getattr(event, "body", None))
```
Align these with your real event store / event model:
- If your tests elsewhere do `store.get_events()` or similar, switch to that.
- Use the same attribute used in the existing `test_source_track_updated_event_contains_contract_fields` test to access the event type and payload (e.g. `event.kind`, `event.meta.event_type`, `event.data`, etc.).
- Update the payload field paths (`payload["source_id"]`, `payload["track_key"]`, `payload["t"]`, `payload["pos_xy"]`) to mirror the actual contract you have for `SENSOR_OBSERVATION_RX`. You can usually infer this from how `SOURCE_TRACK_UPDATED` payloads are asserted in the existing test.
3. **Observation construction**
- If `Observation` requires additional mandatory fields (such as `vel_xy`, `quality`, `heading`, or radar-specific identifiers), add them in both new tests to ensure the observation is considered valid by `ingest_observations`. Mirror the pattern used in other tests in this file.
4. **RadarPipeline signature**
- Confirm that `RadarPipeline` accepts an `emit_observation_rx` boolean keyword argument. If it uses a different name (e.g. `emit_sensor_observation_rx` or is nested in a config object), update the tests to use the correct parameter/structure.
Once these adjustments are made to match your existing conventions, the two new tests will:
- Lock in that `SENSOR_OBSERVATION_RX` events are emitted (with key contract fields) for valid observations when `emit_observation_rx=True`.
- Verify that disabling `emit_observation_rx` suppresses `SENSOR_OBSERVATION_RX` while still emitting `SOURCE_TRACK_UPDATED` for consumers of the event stream.
</issue_to_address>
### Comment 5
<location> `src/qiki/services/q_core_agent/tests/test_radar_multisensor_ingestion.py:17-26` </location>
<code_context>
+ )
+ pipeline.ingest_observations(
+ [
+ Observation(
+ source_id="radar-a",
+ t=100.5,
+ track_key="trk-42",
+ pos_xy=(1.0, 2.0),
+ vel_xy=(0.1, -0.2),
+ quality=0.6,
+ )
+ ]
</code_context>
<issue_to_address>
**suggestion (testing):** Add explicit tests for quality clamping and the trust/quality relationship.
The implementation clamps `quality` to `[0.0, 1.0]` and mirrors it into `trust`, but no tests assert this contract directly. Please add focused ingestion-level tests that:
- Cover `quality < 0`, `quality > 1`, and NaN/non-numeric values, asserting clamping to `[0, 1]` and that `trust` equals the clamped value.
- Cover an in-range value, asserting that `quality` and `trust` on `SourceTrack` and in emitted events match the clamped value.
This will make the behavior explicit and resilient to internal changes.
Suggested implementation:
```python
def test_observation_quality_is_clamped_and_trust_mirrors_clamped_value_below_zero() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=-1.0,
)
]
)
# Expect clamping to 0.0 and mirroring into trust on the SourceTrack.
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert track.quality == 0.0
assert track.trust == 0.0
# Expect emitted events to carry the same clamped quality/trust.
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert last_event.quality == 0.0
assert last_event.trust == 0.0
def test_observation_quality_is_clamped_and_trust_mirrors_clamped_value_above_one() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=2.5,
)
]
)
# Expect clamping to 1.0 and mirroring into trust on the SourceTrack.
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert track.quality == 1.0
assert track.trust == 1.0
# Expect emitted events to carry the same clamped quality/trust.
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert last_event.quality == 1.0
assert last_event.trust == 1.0
def test_observation_quality_nan_is_clamped_and_trust_mirrors_clamped_value() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=float("nan"),
)
]
)
# NaN/non-numeric quality is treated as out-of-range and clamped into [0.0, 1.0].
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert 0.0 <= track.quality <= 1.0
assert track.trust == track.quality
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert 0.0 <= last_event.quality <= 1.0
assert last_event.trust == last_event.quality
def test_observation_quality_in_range_propagates_to_source_track_and_events() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=0.6,
)
]
)
# In-range qualities should pass through unchanged and be mirrored into trust.
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert track.quality == 0.6
assert track.trust == 0.6
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert last_event.quality == 0.6
assert last_event.trust == 0.6
def test_multisource_ingestion_keeps_independent_source_tracks() -> None:
pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
tracks_by_source = pipeline.ingest_observations(
```
These tests assume:
1. The ingestion event type for successful sensor observations is `event_type="SENSOR_OBSERVATION_INGESTED"` on the `SENSORS` subsystem.
2. The event object exposed via `EventStore.filter(...)` has `quality` and `trust` attributes reflecting the clamped values used for tracking.
3. `SourceTrack` instances returned from `pipeline.ingest_observations(...)` are accessible via `tracks_by_source[source_id][track_key]` and expose `quality` and `trust` attributes.
If your actual event type name, event payload shape, or `SourceTrack` attribute names differ, adjust:
- The `event_type` string passed into `store.filter(...)`.
- The attributes read from `last_event` (for example, `last_event.observation.quality` instead of `last_event.quality`).
- The attributes read from `track` (for example, `track.state.quality` instead of `track.quality`).
If NaN values are clamped to a specific boundary (e.g., always `0.0`) instead of “any value in `[0.0, 1.0]`”, tighten the assertions in `test_observation_quality_nan_is_clamped_and_trust_mirrors_clamped_value` to assert the exact expected value.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def _is_finite(value: float) -> bool: | ||
| return math.isfinite(float(value)) |
There was a problem hiding this comment.
issue: Make _is_finite resilient to non-numeric inputs to avoid unexpected exceptions during validation.
_is_finite currently does float(value) directly, so non-numeric inputs (e.g., None, strings, arbitrary objects) will raise and cause validate_observation to error instead of returning an INVALID_* result. Consider wrapping the conversion in a try/except and returning False on failure so such cases are handled as invalid observations rather than crashing ingestion.
| assert dropped[-1].reason == "MISSING_SOURCE_ID" | ||
|
|
||
|
|
||
| def test_multisource_ingestion_keeps_independent_source_tracks() -> None: |
There was a problem hiding this comment.
suggestion (testing): Add a test for track deduplication when multiple observations share the same source_id/track_key.
The multisource test covers per-source bucketing and independence of shared track_keys across sources, but not repeated observations for the same (source_id, track_key). Please add a test that ingests two observations with the same source_id/track_key and different timestamps/positions, and asserts that: (1) only one SourceTrack exists for that pair, and (2) it reflects the last observation. This will capture the intended dedup/last-write-wins behavior in ingest_observations.
Suggested implementation:
)
]
def test_deduplicates_observations_with_same_source_and_track_key() -> None:
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False)
)
observations = [
Observation(
source_id="sensor-1",
t=1.0,
track_key="trk-1",
pos_xy=(0.0, 0.0),
vel_xy=None,
quality=0.9,
),
Observation(
source_id="sensor-1",
t=2.0,
track_key="trk-1",
pos_xy=(10.0, 5.0),
vel_xy=None,
quality=0.7,
),
]
tracks_by_source = pipeline.ingest_observations(observations)
# (1) Only one SourceTrack for this (source_id, track_key) pair
assert set(tracks_by_source.keys()) == {"sensor-1"}
tracks_for_source = tracks_by_source["sensor-1"]
assert set(tracks_for_source.keys()) == {"trk-1"}
track = tracks_for_source["trk-1"]
# (2) Track reflects the *last* observation
assert track.t == 2.0
assert track.pos_xy == (10.0, 5.0)If the SourceTrack API does not expose t/pos_xy directly (for example, if it keeps a last_observation field instead), you should adjust the last two assertions to match the actual API, e.g.:
assert track.last_observation.t == 2.0assert track.last_observation.pos_xy == (10.0, 5.0)
The rest of the test structure (building the pipeline, ingesting two observations, asserting on the per-source/per-track deduplication) should remain the same.
| assert tracks_by_source["radar-b"][0].source_track_id == "trk-1" | ||
|
|
||
|
|
||
| def test_single_source_render_scene_path_remains_compatible() -> None: |
There was a problem hiding this comment.
suggestion (testing): Consider explicitly testing the scene-level contract of source_tracks_to_scene, including multi-source target_id and empty-input behavior.
This test verifies that legacy render_scene and new render_observations produce consistent planning stats, but it doesn’t directly exercise the scene-construction contract. Please add focused tests for source_tracks_to_scene, e.g.:
- Multi-source: assert
target_id == "{source_id}:{track_id}",metadatacontainssource_id,source_track_id,quality,trust,track_index, andvr_mpsis derived from velocity. - Single-source: assert
target_id == track_id. - Empty
tracks_by_source: assert the returnedRadarScenehasok is False,truth_state == "NO_DATA",reason == "NO_DATA", and preservesis_fallback.
These will validate the ingestion → scene contract directly, rather than only via render output.
Suggested implementation:
def test_source_tracks_to_scene_multi_source_contract() -> None:
"""
Multi-source scene contract:
- target_id == "{source_id}:{track_id}"
- metadata contains source_id, source_track_id, quality, trust, track_index
- vr_mps is derived from velocity
"""
# NOTE: adjust construction to match your actual source-track type / helper
tracks_by_source = {
"radar-a": [
make_source_track( # type: ignore[name-defined]
source_id="radar-a",
track_id="trk-1",
x=10.0,
y=0.0,
vx=3.0,
vy=4.0,
quality=0.9,
trust=0.8,
track_index=0,
)
],
"radar-b": [
make_source_track( # type: ignore[name-defined]
source_id="radar-b",
track_id="trk-1",
x=15.0,
y=5.0,
vx=0.0,
vy=5.0,
quality=0.7,
trust=0.6,
track_index=0,
)
],
}
scene = source_tracks_to_scene( # type: ignore[name-defined]
tracks_by_source=tracks_by_source,
is_fallback=False,
)
assert scene.ok is True
assert scene.truth_state == "OK"
assert scene.reason == "OK"
assert scene.is_fallback is False
# We expect one rendered point per source-track
assert len(scene.points) == 2
# Index by target_id for easier assertions
points_by_target_id = {p.target_id: p for p in scene.points} # type: ignore[attr-defined]
point_a = points_by_target_id["radar-a:trk-1"]
point_b = points_by_target_id["radar-b:trk-1"]
# target_id encoding
assert point_a.target_id == "radar-a:trk-1" # type: ignore[attr-defined]
assert point_b.target_id == "radar-b:trk-1" # type: ignore[attr-defined]
# metadata contract
for point, source_id, quality, trust, track_index in [
(point_a, "radar-a", 0.9, 0.8, 0),
(point_b, "radar-b", 0.7, 0.6, 0),
]:
md = point.metadata # type: ignore[attr-defined]
assert md["source_id"] == source_id
assert md["source_track_id"] == "trk-1"
assert md["quality"] == quality
assert md["trust"] == trust
assert md["track_index"] == track_index
# vr_mps derived from velocity magnitude: sqrt(vx^2 + vy^2)
# radar-a: vx=3, vy=4 -> 5 m/s
assert point_a.vr_mps == pytest.approx(5.0, rel=1e-3) # type: ignore[attr-defined]
# radar-b: vx=0, vy=5 -> 5 m/s
assert point_b.vr_mps == pytest.approx(5.0, rel=1e-3) # type: ignore[attr-defined]
def test_source_tracks_to_scene_single_source_contract() -> None:
"""
Single-source scene contract:
- target_id == track_id (no "{source_id}:" prefix)
"""
# NOTE: adjust construction to match your actual source-track type / helper
tracks_by_source = {
"radar-a": [
make_source_track( # type: ignore[name-defined]
source_id="radar-a",
track_id="trk-1",
x=10.0,
y=0.0,
vx=1.0,
vy=0.0,
quality=1.0,
trust=1.0,
track_index=0,
)
]
}
scene = source_tracks_to_scene( # type: ignore[name-defined]
tracks_by_source=tracks_by_source,
is_fallback=False,
)
assert scene.ok is True
assert scene.truth_state == "OK"
assert scene.reason == "OK"
assert scene.is_fallback is False
# Single source: we expect target_id == track_id
assert len(scene.points) == 1
point = scene.points[0]
assert point.target_id == "trk-1" # type: ignore[attr-defined]
md = point.metadata # type: ignore[attr-defined]
assert md["source_id"] == "radar-a"
assert md["source_track_id"] == "trk-1"
assert md["track_index"] == 0
def test_source_tracks_to_scene_empty_tracks_by_source() -> None:
"""
Empty-input scene contract:
- ok is False
- truth_state == "NO_DATA"
- reason == "NO_DATA"
- is_fallback is preserved
"""
scene = source_tracks_to_scene( # type: ignore[name-defined]
tracks_by_source={},
is_fallback=True,
)
assert scene.ok is False
assert scene.truth_state == "NO_DATA"
assert scene.reason == "NO_DATA"
assert scene.is_fallback is True
def test_single_source_render_scene_path_remains_compatible() -> None:
legacy_pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
ingest_pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
legacy_scene = RadarScene(
ok=True,
reason="OK",
truth_state="OK",
is_fallback=False,
points=[
RadarPoint(
x=12.0,
y=3.0,
z=0.0,- Ensure
pytestis imported at the top of the file (e.g.import pytest) if it is not already. - Replace the placeholder
make_source_track(...)helper calls with the actual way you construct per-source track objects in this test module. For example, you might:- Use an existing factory/fixture, or
- Directly instantiate your track type, e.g.
RadarSourceTrack(...)or similar.
- Update the
source_tracks_to_scene(...)calls to match the real function signature (e.g. additional parameters such as timestamp, ego pose, or config). The tests are written to focus on the scene contract; wire in any required extra arguments. - If your
RadarPointobjects expose different attribute names fortarget_id,metadata, orvr_mps, adjust the assertions to match (e.g.point.idinstead ofpoint.target_id, orpoint.infoinstead ofpoint.metadata). - If the exact
truth_state/reasonstrings differ slightly (for example, constants or enums instead of raw"OK"/"NO_DATA"), replace the string literals with the appropriate values or enum references used elsewhere in the tests.
| assert legacy_output.plan.stats.lod_level == ingest_output.plan.stats.lod_level | ||
|
|
||
|
|
||
| def test_source_track_updated_event_contains_contract_fields() -> None: |
There was a problem hiding this comment.
suggestion (testing): Extend event-related tests to cover SENSOR_OBSERVATION_RX and the emit_observation_rx flag.
Since ingest_observations can emit both SENSOR_OBSERVATION_RX and SOURCE_TRACK_UPDATED, and emit_observation_rx gates the former, it would be valuable to:
- Add a test that validates a
SENSOR_OBSERVATION_RXevent is emitted for a valid observation (including checking key payload fields/values). - Add a test where
emit_observation_rx=False, asserting thatSOURCE_TRACK_UPDATEDis still emitted butSENSOR_OBSERVATION_RXis not.
This will better lock in the event emission contract and flag behavior for consumers of the event stream.
Suggested implementation:
assert legacy_output.plan.stats.targets_count == ingest_output.plan.stats.targets_count
assert legacy_output.plan.stats.lod_level == ingest_output.plan.stats.lod_level
def test_sensor_observation_rx_event_emitted_for_valid_observation() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
# Explicitly enable emission of SENSOR_OBSERVATION_RX events
emit_observation_rx=True,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="src-1",
t=123.0,
track_key="trk-1",
pos_xy=(1.0, 2.0),
# Include any other required fields for a valid observation here
# (e.g. vel_xy, quality, heading, radar_id, etc.)
)
]
)
# Ensure the ingest call produced tracks (sanity check; adjust if not needed)
assert tracks_by_source
# Filter events for SENSOR_OBSERVATION_RX
sensor_observation_rx_events = [
event
for event in store.events
if getattr(event, "type", getattr(event, "event_type", None)) == SENSOR_OBSERVATION_RX
]
# One or more SENSOR_OBSERVATION_RX events must be emitted for a valid observation
assert sensor_observation_rx_events, "Expected at least one SENSOR_OBSERVATION_RX event"
# Validate key contract fields in the first event payload
event = sensor_observation_rx_events[0]
payload = getattr(event, "payload", getattr(event, "body", None))
# These assertions should mirror the contract of SENSOR_OBSERVATION_RX events.
# Adjust field paths if your event model differs.
assert payload is not None
assert payload["source_id"] == "src-1"
assert payload["track_key"] == "trk-1"
assert payload["t"] == 123.0
assert tuple(payload["pos_xy"]) == (1.0, 2.0)
def test_source_track_updated_emitted_when_emit_observation_rx_disabled() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
# Explicitly disable emission of SENSOR_OBSERVATION_RX events
emit_observation_rx=False,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="src-1",
t=234.0,
track_key="trk-2",
pos_xy=(3.0, 4.0),
# Include any other required fields for a valid observation here
)
]
)
# Ensure the ingest call produced tracks (sanity check; adjust if not needed)
assert tracks_by_source
# Collect event types for easier assertions
event_types = [
getattr(event, "type", getattr(event, "event_type", None)) for event in store.events
]
# SOURCE_TRACK_UPDATED must still be emitted
assert SOURCE_TRACK_UPDATED in event_types
# SENSOR_OBSERVATION_RX must not be emitted when emit_observation_rx=False
assert SENSOR_OBSERVATION_RX not in event_typesTo fully integrate these tests you will likely need to:
-
Imports
- Ensure
SENSOR_OBSERVATION_RXandSOURCE_TRACK_UPDATEDare imported at the top of the file from the module where your event types are defined. For example (adjust the module path to match your project):from qiki.events import SENSOR_OBSERVATION_RX, SOURCE_TRACK_UPDATED
- If
EventStore,RadarPipeline,RadarRenderConfig, andObservationare not already imported in this file, make sure they are imported consistently with the rest of the tests.
- Ensure
-
Event model access
- The code currently uses:
Align these with your real event store / event model:
store.events getattr(event, "type", getattr(event, "event_type", None)) getattr(event, "payload", getattr(event, "body", None))
- If your tests elsewhere do
store.get_events()or similar, switch to that. - Use the same attribute used in the existing
test_source_track_updated_event_contains_contract_fieldstest to access the event type and payload (e.g.event.kind,event.meta.event_type,event.data, etc.). - Update the payload field paths (
payload["source_id"],payload["track_key"],payload["t"],payload["pos_xy"]) to mirror the actual contract you have forSENSOR_OBSERVATION_RX. You can usually infer this from howSOURCE_TRACK_UPDATEDpayloads are asserted in the existing test.
- If your tests elsewhere do
- The code currently uses:
-
Observation construction
- If
Observationrequires additional mandatory fields (such asvel_xy,quality,heading, or radar-specific identifiers), add them in both new tests to ensure the observation is considered valid byingest_observations. Mirror the pattern used in other tests in this file.
- If
-
RadarPipeline signature
- Confirm that
RadarPipelineaccepts anemit_observation_rxboolean keyword argument. If it uses a different name (e.g.emit_sensor_observation_rxor is nested in a config object), update the tests to use the correct parameter/structure.
- Confirm that
Once these adjustments are made to match your existing conventions, the two new tests will:
- Lock in that
SENSOR_OBSERVATION_RXevents are emitted (with key contract fields) for valid observations whenemit_observation_rx=True. - Verify that disabling
emit_observation_rxsuppressesSENSOR_OBSERVATION_RXwhile still emittingSOURCE_TRACK_UPDATEDfor consumers of the event stream.
| Observation( | ||
| source_id="", | ||
| t=123.0, | ||
| track_key="trk-1", | ||
| pos_xy=(1.0, 2.0), | ||
| vel_xy=None, | ||
| quality=0.8, | ||
| ) | ||
| ] | ||
| ) |
There was a problem hiding this comment.
suggestion (testing): Add explicit tests for quality clamping and the trust/quality relationship.
The implementation clamps quality to [0.0, 1.0] and mirrors it into trust, but no tests assert this contract directly. Please add focused ingestion-level tests that:
- Cover
quality < 0,quality > 1, and NaN/non-numeric values, asserting clamping to[0, 1]and thattrustequals the clamped value. - Cover an in-range value, asserting that
qualityandtrustonSourceTrackand in emitted events match the clamped value.
This will make the behavior explicit and resilient to internal changes.
Suggested implementation:
def test_observation_quality_is_clamped_and_trust_mirrors_clamped_value_below_zero() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=-1.0,
)
]
)
# Expect clamping to 0.0 and mirroring into trust on the SourceTrack.
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert track.quality == 0.0
assert track.trust == 0.0
# Expect emitted events to carry the same clamped quality/trust.
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert last_event.quality == 0.0
assert last_event.trust == 0.0
def test_observation_quality_is_clamped_and_trust_mirrors_clamped_value_above_one() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=2.5,
)
]
)
# Expect clamping to 1.0 and mirroring into trust on the SourceTrack.
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert track.quality == 1.0
assert track.trust == 1.0
# Expect emitted events to carry the same clamped quality/trust.
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert last_event.quality == 1.0
assert last_event.trust == 1.0
def test_observation_quality_nan_is_clamped_and_trust_mirrors_clamped_value() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=float("nan"),
)
]
)
# NaN/non-numeric quality is treated as out-of-range and clamped into [0.0, 1.0].
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert 0.0 <= track.quality <= 1.0
assert track.trust == track.quality
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert 0.0 <= last_event.quality <= 1.0
assert last_event.trust == last_event.quality
def test_observation_quality_in_range_propagates_to_source_track_and_events() -> None:
store = EventStore(maxlen=50, enabled=True)
pipeline = RadarPipeline(
RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False),
event_store=store,
)
tracks_by_source = pipeline.ingest_observations(
[
Observation(
source_id="radar-a",
t=100.5,
track_key="trk-42",
pos_xy=(1.0, 2.0),
vel_xy=(0.1, -0.2),
quality=0.6,
)
]
)
# In-range qualities should pass through unchanged and be mirrored into trust.
source_tracks = tracks_by_source["radar-a"]
track = source_tracks["trk-42"]
assert track.quality == 0.6
assert track.trust == 0.6
ingested_events = store.filter(subsystem="SENSORS", event_type="SENSOR_OBSERVATION_INGESTED")
assert ingested_events
last_event = ingested_events[-1]
assert last_event.quality == 0.6
assert last_event.trust == 0.6
def test_multisource_ingestion_keeps_independent_source_tracks() -> None:
pipeline = RadarPipeline(RadarRenderConfig(renderer="unicode", view="top", fps_max=10, color=False))
tracks_by_source = pipeline.ingest_observations(These tests assume:
- The ingestion event type for successful sensor observations is
event_type="SENSOR_OBSERVATION_INGESTED"on theSENSORSsubsystem. - The event object exposed via
EventStore.filter(...)hasqualityandtrustattributes reflecting the clamped values used for tracking. SourceTrackinstances returned frompipeline.ingest_observations(...)are accessible viatracks_by_source[source_id][track_key]and exposequalityandtrustattributes.
If your actual event type name, event payload shape, or SourceTrack attribute names differ, adjust:
- The
event_typestring passed intostore.filter(...). - The attributes read from
last_event(for example,last_event.observation.qualityinstead oflast_event.quality). - The attributes read from
track(for example,track.state.qualityinstead oftrack.quality).
If NaN values are clamped to a specific boundary (e.g., always 0.0) instead of “any value in [0.0, 1.0]”, tighten the assertions in test_observation_quality_nan_is_clamped_and_trust_mirrors_clamped_value to assert the exact expected value.
Summary
ObservationandSourceTrackRadarPipeline:ingest_observationsandrender_observationsrender_scene(RadarScene)path backward compatibleSENSOR_OBSERVATION_RX,SOURCE_TRACK_UPDATED,SENSOR_OBSERVATION_DROPPEDTASKS/TASK_0024A_multisensor_contract.mdTests
pytest -q src/qiki/services/q_core_agent/tests/test_radar_multisensor_ingestion.py src/qiki/services/q_core_agent/tests/test_radar_pipeline.pyruff check src/qiki/services/q_core_agent/core/radar_ingestion.py src/qiki/services/q_core_agent/core/radar_pipeline.py src/qiki/services/q_core_agent/tests/test_radar_multisensor_ingestion.pySummary by Sourcery
Introduce a multi-sensor observation ingestion layer and wire it into the radar pipeline while preserving the existing scene-based rendering path.
New Features:
Bug Fixes:
Enhancements:
Documentation:
Tests:
Summary by CodeRabbit
Примечания к выпуску
Новые функции
policy setиpolicy cycle.Тесты