22
33from __future__ import annotations
44
5- from asyncio import Task , create_task
5+ from asyncio import Task , create_task , gather
66from collections .abc import Awaitable , Callable
77from dataclasses import replace
88from datetime import UTC , datetime
99from functools import wraps
1010import logging
11+ from math import ceil
1112from typing import Any , Final , TypeVar , cast
1213
1314from ..api import (
2425)
2526from ..connection import StickController
2627from ..constants import (
28+ DAY_IN_HOURS ,
2729 DEFAULT_CONS_INTERVAL ,
2830 MAX_TIME_DRIFT ,
2931 MINIMAL_POWER_UPDATE ,
7274# Default firmware if not known
7375DEFAULT_FIRMWARE : Final = datetime (2008 , 8 , 26 , 15 , 46 , tzinfo = UTC )
7476
77+ MAX_LOG_HOURS = DAY_IN_HOURS
78+
7579FuncT = TypeVar ("FuncT" , bound = Callable [..., Any ])
7680_LOGGER = logging .getLogger (__name__ )
7781
@@ -114,7 +118,6 @@ def __init__(
114118 self ._energy_counters = EnergyCounters (mac )
115119 self ._retrieve_energy_logs_task : None | Task [None ] = None
116120 self ._last_energy_log_requested : bool = False
117- self ._last_collected_energy_timestamp : datetime | None = None
118121
119122 self ._group_member : list [int ] = []
120123
@@ -364,8 +367,8 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
364367
365368 # Always request last energy log records at initial startup
366369 if not self ._last_energy_log_requested :
367- self ._last_energy_log_requested , _ = await self .energy_log_update (
368- self ._current_log_address
370+ self ._last_energy_log_requested = await self .energy_log_update (
371+ self ._current_log_address , save_cache = False
369372 )
370373
371374 if self ._energy_counters .log_rollover :
@@ -378,26 +381,25 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
378381 return None
379382
380383 # Try collecting energy-stats for _current_log_address
381- result , _ = await self .energy_log_update (self ._current_log_address )
384+ result = await self .energy_log_update (self ._current_log_address , save_cache = True )
382385 if not result :
383386 _LOGGER .debug (
384- "async_energy_update | %s | Log rollover | energy_log_update failed" ,
387+ "async_energy_update | %s | Log rollover | energy_log_update from address %s failed" ,
385388 self ._mac_in_str ,
389+ self ._current_log_address ,
386390 )
387391 return None
388392
389393 if self ._current_log_address is not None :
390394 # Retry with previous log address as Circle node pointer to self._current_log_address
391395 # could be rolled over while the last log is at previous address/slot
392- _prev_log_address , _ = calc_log_address (
393- self ._current_log_address , 1 , - 4
394- )
395- result , _ = await self .energy_log_update (_prev_log_address )
396+ prev_log_address , _ = calc_log_address (self ._current_log_address , 1 , - 4 )
397+ result = await self .energy_log_update (prev_log_address , save_cache = True )
396398 if not result :
397399 _LOGGER .debug (
398- "async_energy_update | %s | Log rollover | energy_log_update %s failed" ,
400+ "async_energy_update | %s | Log rollover | energy_log_update from address %s failed" ,
399401 self ._mac_in_str ,
400- _prev_log_address ,
402+ prev_log_address ,
401403 )
402404 return None
403405
@@ -413,7 +415,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
413415 return self ._energy_counters .energy_statistics
414416
415417 if len (missing_addresses ) == 1 :
416- result , _ = await self .energy_log_update (missing_addresses [0 ])
418+ result = await self .energy_log_update (missing_addresses [0 ], save_cache = True )
417419 if result :
418420 await self .power_update ()
419421 _LOGGER .debug (
@@ -452,64 +454,38 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
452454 return None
453455
454456 async def _get_initial_energy_logs (self ) -> None :
455- """Collect initial energy logs from the last 10 log addresses ."""
457+ """Collect initial energy logs for recent hours up to MAX_LOG_HOURS ."""
456458 if self ._current_log_address is None :
457459 return
458460
461+ if self .energy_consumption_interval is None :
462+ return
463+
459464 _LOGGER .debug (
460- "Start collecting initial energy logs from the last 10 log addresses for node %s." ,
465+ "Start collecting today's energy logs for node %s." ,
461466 self ._mac_in_str ,
462467 )
463- total_addresses = 11
468+
469+ # When only consumption is measured, 1 address contains data from 4 hours
470+ # When both consumption and production are measured, 1 address contains data from 2 hours
471+ cons_only = self .energy_production_interval is None
472+ factor = 4 if cons_only else 2
473+ max_addresses_to_collect = MAX_LOG_HOURS // factor
474+ total_addresses = min (
475+ max_addresses_to_collect , ceil (datetime .now (tz = UTC ).hour / factor ) + 1
476+ )
464477 log_address = self ._current_log_address
465- prev_address_timestamp : datetime | None = None
466478 while total_addresses > 0 :
467- result , empty_log = await self .energy_log_update (log_address )
468- if result and empty_log :
469- # Handle case with None-data in all address slots
479+ result = await self .energy_log_update (log_address , save_cache = False )
480+ if not result :
481+ # Stop initial log collection when an address contains no (None) or outdated data
482+ # Outdated data can indicate a EnergyLog address rollover: from address 6014 to 0
470483 _LOGGER .debug (
471- "Energy None-data collected from log address %s, stopping collection" ,
484+ "All slots at log address %s are empty or outdated – stopping initial collection" ,
472485 log_address ,
473486 )
474487 break
475488
476- # Check if the most recent timestamp of an earlier address is recent
477- # (within 2/4 * log_interval plus 5 mins margin)
478- log_interval = self .energy_consumption_interval
479- factor = 2 if self .energy_production_interval is not None else 4
480-
481- if log_interval is not None :
482- max_gap_minutes = (factor * log_interval ) + 5
483- if log_address == self ._current_log_address :
484- if (
485- self ._last_collected_energy_timestamp is not None
486- and (
487- datetime .now (tz = UTC ) - self ._last_collected_energy_timestamp
488- ).total_seconds ()
489- // 60
490- > max_gap_minutes
491- ):
492- _LOGGER .debug (
493- "Energy data collected from the current log address is outdated, stopping collection"
494- )
495- break
496- elif (
497- prev_address_timestamp is not None
498- and self ._last_collected_energy_timestamp is not None
499- and (
500- prev_address_timestamp - self ._last_collected_energy_timestamp
501- ).total_seconds ()
502- // 60
503- > max_gap_minutes
504- ):
505- _LOGGER .debug (
506- "Collected energy data is outdated, stopping collection"
507- )
508- break
509-
510- if self ._last_collected_energy_timestamp is not None :
511- prev_address_timestamp = self ._last_collected_energy_timestamp
512-
513489 log_address , _ = calc_log_address (log_address , 1 , - 4 )
514490 total_addresses -= 1
515491
@@ -535,80 +511,98 @@ async def get_missing_energy_logs(self) -> None:
535511 )
536512 missing_addresses = sorted (missing_addresses , reverse = True )
537513 tasks = [
538- create_task (self .energy_log_update (address ))
514+ create_task (self .energy_log_update (address , save_cache = False ))
539515 for address in missing_addresses
540516 ]
541- for task in tasks :
542- await task
517+ for idx , task in enumerate (tasks ):
518+ result = await task
519+ # When an energy log collection task returns False, stop and cancel the remaining tasks
520+ if not result :
521+ to_cancel = tasks [idx + 1 :]
522+ for t in to_cancel :
523+ t .cancel ()
524+ # Drain cancellations to avoid "Task exception was never retrieved"
525+ await gather (* to_cancel , return_exceptions = True )
526+ break
543527
544528 if self ._cache_enabled :
545529 await self ._energy_log_records_save_to_cache ()
546530
547- async def energy_log_update (self , address : int | None ) -> tuple [bool , bool ]:
548- """Request energy log statistics from node. Returns true if successful."""
549- empty_log = False
550- result = False
531+ async def energy_log_update (self , address : int | None , save_cache : bool = True ) -> bool :
532+ """Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False."""
533+ any_record_stored = False
551534 if address is None :
552- return result , empty_log
535+ return False
553536
554537 _LOGGER .debug (
555- "Request of energy log at address %s for node %s" ,
556- str ( address ) ,
557- self . name ,
538+ "Requesting EnergyLogs from node %s address %s" ,
539+ self . _mac_in_str ,
540+ address ,
558541 )
559542 request = CircleEnergyLogsRequest (self ._send , self ._mac_in_bytes , address )
560543 if (response := await request .send ()) is None :
561544 _LOGGER .debug (
562- "Retrieving of energy log at address %s for node %s failed" ,
563- str (address ),
545+ "Retrieving EnergyLogs data from node %s failed" ,
564546 self ._mac_in_str ,
565547 )
566- return result , empty_log
548+ return False
567549
568- _LOGGER .debug ("EnergyLogs data from %s, address=%s" , self ._mac_in_str , address )
550+ _LOGGER .debug ("EnergyLogs from node %s, address=%s: " , self ._mac_in_str , address )
569551 await self ._available_update_state (True , response .timestamp )
570- energy_record_update = False
571552
572553 # Forward historical energy log information to energy counters
573554 # Each response message contains 4 log counters (slots) of the
574555 # energy pulses collected during the previous hour of given timestamp
575- last_energy_timestamp_collected = False
576556 for _slot in range (4 , 0 , - 1 ):
577557 log_timestamp , log_pulses = response .log_data [_slot ]
578558 _LOGGER .debug (
579559 "In slot=%s: pulses=%s, timestamp=%s" , _slot , log_pulses , log_timestamp
580560 )
581- if log_timestamp is None or log_pulses is None :
561+ if (
562+ log_timestamp is None
563+ or log_pulses is None
564+ # Don't store an old log record; store an empty record instead
565+ or not self ._check_timestamp_is_recent (address , _slot , log_timestamp )
566+ ):
582567 self ._energy_counters .add_empty_log (response .log_address , _slot )
583- empty_log = True
584- elif await self ._energy_log_record_update_state (
568+ continue
569+
570+ await self ._energy_log_record_update_state (
585571 response .log_address ,
586572 _slot ,
587573 log_timestamp .replace (tzinfo = UTC ),
588574 log_pulses ,
589575 import_only = True ,
590- ):
591- energy_record_update = True
592- if not last_energy_timestamp_collected :
593- # Collect the timestamp of the most recent response
594- self ._last_collected_energy_timestamp = log_timestamp .replace (
595- tzinfo = UTC
596- )
597- _LOGGER .debug (
598- "Setting last_collected_energy_timestamp to %s" ,
599- self ._last_collected_energy_timestamp ,
600- )
601- last_energy_timestamp_collected = True
576+ )
577+ any_record_stored = True
602578
603- result = True
604579 self ._energy_counters .update ()
605- if energy_record_update :
580+ if any_record_stored and self . _cache_enabled and save_cache :
606581 _LOGGER .debug (
607582 "Saving energy record update to cache for %s" , self ._mac_in_str
608583 )
609584 await self .save_cache ()
610585
611- return result , empty_log
586+ return any_record_stored
587+
588+ def _check_timestamp_is_recent (
589+ self , address : int , slot : int , timestamp : datetime
590+ ) -> bool :
591+ """Check if a log record timestamp is within the last MAX_LOG_HOURS hours."""
592+ age_seconds = max (
593+ 0.0 ,
594+ (datetime .now (tz = UTC ) - timestamp .replace (tzinfo = UTC )).total_seconds ()
595+ )
596+ if age_seconds > MAX_LOG_HOURS * 3600 :
597+ _LOGGER .warning (
598+ "EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring..." ,
599+ self ._mac_in_str ,
600+ address ,
601+ slot ,
602+ timestamp ,
603+ )
604+ return False
605+ return True
612606
613607 async def _energy_log_records_load_from_cache (self ) -> bool :
614608 """Load energy_log_record from cache."""
0 commit comments