55from asyncio import Task , create_task , gather
66from collections .abc import Awaitable , Callable
77from dataclasses import replace
8- from datetime import UTC , datetime
8+ from datetime import UTC , datetime , timedelta
99from functools import wraps
1010import logging
1111from typing import Any , Final , TypeVar , cast
@@ -595,14 +595,14 @@ def _check_timestamp_is_recent(
595595 return False
596596 return True
597597
598- async def _energy_log_records_load_from_cache (self ) -> bool :
598+ async def _energy_log_records_load_from_cache (self ) -> bool : # noqa: PLR0912
599599 """Load energy_log_record from cache."""
600600 if (cache_data := self ._get_cache (CACHE_ENERGY_COLLECTION )) is None :
601601 _LOGGER .warning (
602602 "Failed to restore energy log records from cache for node %s" , self .name
603603 )
604604 return False
605- restored_logs : dict [int , list [int ]] = {}
605+ restored_logs : dict [int , dict [int , tuple [ datetime , int ] ]] = {}
606606 if cache_data == "" :
607607 _LOGGER .debug ("Cache-record is empty" )
608608 return False
@@ -615,24 +615,41 @@ async def _energy_log_records_load_from_cache(self) -> bool:
615615 if len (timestamp_energy_log ) == 6 :
616616 address = int (log_fields [0 ])
617617 slot = int (log_fields [1 ])
618- self ._energy_counters .add_pulse_log (
619- address = address ,
620- slot = slot ,
621- timestamp = datetime (
622- year = int (timestamp_energy_log [0 ]),
623- month = int (timestamp_energy_log [1 ]),
624- day = int (timestamp_energy_log [2 ]),
625- hour = int (timestamp_energy_log [3 ]),
626- minute = int (timestamp_energy_log [4 ]),
627- second = int (timestamp_energy_log [5 ]),
628- tzinfo = UTC ,
629- ),
630- pulses = int (log_fields [3 ]),
631- import_only = True ,
618+ pulses = int (log_fields [3 ])
619+ timestamp = datetime (
620+ year = int (timestamp_energy_log [0 ]),
621+ month = int (timestamp_energy_log [1 ]),
622+ day = int (timestamp_energy_log [2 ]),
623+ hour = int (timestamp_energy_log [3 ]),
624+ minute = int (timestamp_energy_log [4 ]),
625+ second = int (timestamp_energy_log [5 ]),
626+ tzinfo = UTC ,
632627 )
633628 if restored_logs .get (address ) is None :
634- restored_logs [address ] = []
635- restored_logs [address ].append (slot )
629+ restored_logs [address ] = {}
630+ restored_logs [address ][slot ] = (timestamp , pulses )
631+
632+ # Sort and prune the records loaded from cache
633+ sorted_logs : dict [int , dict [int , tuple [datetime , int ]]] = {}
634+ skip_before = datetime .now (tz = UTC ) - timedelta (hours = DAY_IN_HOURS )
635+ sorted_addresses = sorted (restored_logs .keys (), reverse = True )
636+ for address in sorted_addresses :
637+ sorted_slots = sorted (restored_logs [address ].keys (), reverse = True )
638+ for slot in sorted_slots :
639+ if restored_logs [address ][slot ][0 ] > skip_before :
640+ if sorted_logs .get (address ) is None :
641+ sorted_logs [address ] = {}
642+ sorted_logs [address ][slot ] = restored_logs [address ][slot ]
643+
644+ for address , data in sorted_logs .items ():
645+ for slot , pulse_data in data .items ():
646+ self ._energy_counters .add_pulse_log (
647+ address = address ,
648+ slot = slot ,
649+ pulses = pulse_data [1 ],
650+ timestamp = pulse_data [0 ],
651+ import_only = True ,
652+ )
636653
637654 self ._energy_counters .update ()
638655
0 commit comments