Skip to content

Commit c5ed148

Browse files
Fix race condition in statistics that created spikes (#129066)
* fixed race condition and added test case for updates before db load * removed duplicated code * improved comments, removed superfluous errors / assertions * allow both possible outcomes of race condition * use approx for float comparison * Update tests/components/statistics/test_sensor.py Co-authored-by: Erik Montnemery <[email protected]> * force new state before database load in race condition test --------- Co-authored-by: Erik Montnemery <[email protected]>
1 parent e774c71 commit c5ed148

File tree

3 files changed

+95
-17
lines changed

3 files changed

+95
-17
lines changed

homeassistant/components/statistics/config_flow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ async def async_setup_preview(hass: HomeAssistant) -> None:
169169
vol.Required("user_input"): dict,
170170
}
171171
)
172-
@callback
173-
def ws_start_preview(
172+
@websocket_api.async_response
173+
async def ws_start_preview(
174174
hass: HomeAssistant,
175175
connection: websocket_api.ActiveConnection,
176176
msg: dict[str, Any],
@@ -234,6 +234,6 @@ def async_preview_updated(state: str, attributes: Mapping[str, Any]) -> None:
234234
preview_entity.hass = hass
235235

236236
connection.send_result(msg["id"])
237-
connection.subscriptions[msg["id"]] = preview_entity.async_start_preview(
237+
connection.subscriptions[msg["id"]] = await preview_entity.async_start_preview(
238238
async_preview_updated
239239
)

homeassistant/components/statistics/sensor.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
async_track_state_change_event,
5151
)
5252
from homeassistant.helpers.reload import async_setup_reload_service
53-
from homeassistant.helpers.start import async_at_start
5453
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
5554
from homeassistant.util import dt as dt_util
5655
from homeassistant.util.enum import try_parse_enum
@@ -373,8 +372,7 @@ def __init__(
373372
self._update_listener: CALLBACK_TYPE | None = None
374373
self._preview_callback: Callable[[str, Mapping[str, Any]], None] | None = None
375374

376-
@callback
377-
def async_start_preview(
375+
async def async_start_preview(
378376
self,
379377
preview_callback: Callable[[str, Mapping[str, Any]], None],
380378
) -> CALLBACK_TYPE:
@@ -392,7 +390,7 @@ def async_start_preview(
392390

393391
self._preview_callback = preview_callback
394392

395-
self._async_stats_sensor_startup(self.hass)
393+
await self._async_stats_sensor_startup()
396394
return self._call_on_remove_callbacks
397395

398396
@callback
@@ -413,25 +411,27 @@ def _async_stats_sensor_state_listener(
413411
if not self._preview_callback:
414412
self.async_write_ha_state()
415413

416-
@callback
417-
def _async_stats_sensor_startup(self, _: HomeAssistant) -> None:
418-
"""Add listener and get recorded state."""
414+
async def _async_stats_sensor_startup(self) -> None:
415+
"""Add listener and get recorded state.
416+
417+
Historical data needs to be loaded from the database first before we
418+
can start accepting new incoming changes.
419+
This is needed to ensure that the buffer is properly sorted by time.
420+
"""
419421
_LOGGER.debug("Startup for %s", self.entity_id)
422+
if "recorder" in self.hass.config.components:
423+
await self._initialize_from_database()
420424
self.async_on_remove(
421425
async_track_state_change_event(
422426
self.hass,
423427
[self._source_entity_id],
424428
self._async_stats_sensor_state_listener,
425429
)
426430
)
427-
if "recorder" in self.hass.config.components:
428-
self.hass.async_create_task(self._initialize_from_database())
429431

430432
async def async_added_to_hass(self) -> None:
431433
"""Register callbacks."""
432-
self.async_on_remove(
433-
async_at_start(self.hass, self._async_stats_sensor_startup)
434-
)
434+
await self._async_stats_sensor_startup()
435435

436436
def _add_state_to_queue(self, new_state: State) -> None:
437437
"""Add the state to the queue."""
@@ -712,7 +712,9 @@ def _update_value(self) -> None:
712712
"""
713713

714714
value = self._state_characteristic_fn()
715-
715+
_LOGGER.debug(
716+
"Updating value: states: %s, ages: %s => %s", self.states, self.ages, value
717+
)
716718
if self._state_characteristic not in STATS_NOT_A_NUMBER:
717719
with contextlib.suppress(TypeError):
718720
value = round(cast(float, value), self._precision)

tests/components/statistics/test_sensor.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@
22

33
from __future__ import annotations
44

5+
from asyncio import Event as AsyncioEvent
56
from collections.abc import Sequence
67
from datetime import datetime, timedelta
78
import statistics
9+
from threading import Event
810
from typing import Any
911
from unittest.mock import patch
1012

1113
from freezegun import freeze_time
1214
import pytest
1315

1416
from homeassistant import config as hass_config
15-
from homeassistant.components.recorder import Recorder
17+
from homeassistant.components.recorder import Recorder, history
1618
from homeassistant.components.sensor import (
1719
ATTR_STATE_CLASS,
1820
SensorDeviceClass,
@@ -50,6 +52,7 @@
5052

5153
VALUES_BINARY = ["on", "off", "on", "off", "on", "off", "on", "off", "on"]
5254
VALUES_NUMERIC = [17, 20, 15.2, 5, 3.8, 9.2, 6.7, 14, 6]
55+
VALUES_NUMERIC_LINEAR = [1, 2, 3, 4, 5, 6, 7, 8, 9]
5356

5457

5558
async def test_unique_id(
@@ -1701,3 +1704,76 @@ async def test_device_id(
17011704
statistics_entity = entity_registry.async_get("sensor.statistics")
17021705
assert statistics_entity is not None
17031706
assert statistics_entity.device_id == source_entity.device_id
1707+
1708+
1709+
async def test_update_before_load(recorder_mock: Recorder, hass: HomeAssistant) -> None:
1710+
"""Verify that updates happening before reloading from the database are handled correctly."""
1711+
1712+
current_time = dt_util.utcnow()
1713+
1714+
# enable and pre-fill the recorder
1715+
await hass.async_block_till_done()
1716+
await async_wait_recording_done(hass)
1717+
1718+
with (
1719+
freeze_time(current_time) as freezer,
1720+
):
1721+
for value in VALUES_NUMERIC_LINEAR:
1722+
hass.states.async_set(
1723+
"sensor.test_monitored",
1724+
str(value),
1725+
{ATTR_UNIT_OF_MEASUREMENT: UnitOfTemperature.CELSIUS},
1726+
)
1727+
await hass.async_block_till_done()
1728+
current_time += timedelta(seconds=1)
1729+
freezer.move_to(current_time)
1730+
1731+
await async_wait_recording_done(hass)
1732+
1733+
# some synchronisation is needed to prevent that loading from the database finishes too soon
1734+
# we want this to take long enough to be able to try to add a value BEFORE loading is done
1735+
state_changes_during_period_called_evt = AsyncioEvent()
1736+
state_changes_during_period_stall_evt = Event()
1737+
real_state_changes_during_period = history.state_changes_during_period
1738+
1739+
def mock_state_changes_during_period(*args, **kwargs):
1740+
states = real_state_changes_during_period(*args, **kwargs)
1741+
hass.loop.call_soon_threadsafe(state_changes_during_period_called_evt.set)
1742+
state_changes_during_period_stall_evt.wait()
1743+
return states
1744+
1745+
# create the statistics component, get filled from database
1746+
with patch(
1747+
"homeassistant.components.statistics.sensor.history.state_changes_during_period",
1748+
mock_state_changes_during_period,
1749+
):
1750+
assert await async_setup_component(
1751+
hass,
1752+
"sensor",
1753+
{
1754+
"sensor": [
1755+
{
1756+
"platform": "statistics",
1757+
"name": "test",
1758+
"entity_id": "sensor.test_monitored",
1759+
"state_characteristic": "average_step",
1760+
"max_age": {"seconds": 10},
1761+
},
1762+
]
1763+
},
1764+
)
1765+
# adding this value is going to be ignored, since loading from the database hasn't finished yet
1766+
# if this value would be added before loading from the database is done
1767+
# it would mess up the order of the internal queue which is supposed to be sorted by time
1768+
await state_changes_during_period_called_evt.wait()
1769+
hass.states.async_set(
1770+
"sensor.test_monitored",
1771+
"10",
1772+
{ATTR_UNIT_OF_MEASUREMENT: DEGREE},
1773+
)
1774+
state_changes_during_period_stall_evt.set()
1775+
await hass.async_block_till_done()
1776+
1777+
# we will end up with a buffer of [1 .. 9] (10 wasn't added)
1778+
# so the computed average_step is 1+2+3+4+5+6+7+8/8 = 4.5
1779+
assert float(hass.states.get("sensor.test").state) == pytest.approx(4.5)

0 commit comments

Comments
 (0)