55from asyncio import Task , create_task , gather
66from collections .abc import Awaitable , Callable
77from dataclasses import replace
8- from datetime import UTC , datetime
8+ from datetime import UTC , datetime , timedelta
99from functools import wraps
1010import logging
1111from math import ceil
@@ -604,14 +604,14 @@ def _check_timestamp_is_recent(
604604 return False
605605 return True
606606
607- async def _energy_log_records_load_from_cache (self ) -> bool :
607+ async def _energy_log_records_load_from_cache (self ) -> bool : # noqa: PLR0912
608608 """Load energy_log_record from cache."""
609609 if (cache_data := self ._get_cache (CACHE_ENERGY_COLLECTION )) is None :
610610 _LOGGER .warning (
611611 "Failed to restore energy log records from cache for node %s" , self .name
612612 )
613613 return False
614- restored_logs : dict [int , list [int ]] = {}
614+ restored_logs : dict [int , dict [int , tuple [ datetime , int ] ]] = {}
615615 if cache_data == "" :
616616 _LOGGER .debug ("Cache-record is empty" )
617617 return False
@@ -624,24 +624,41 @@ async def _energy_log_records_load_from_cache(self) -> bool:
624624 if len (timestamp_energy_log ) == 6 :
625625 address = int (log_fields [0 ])
626626 slot = int (log_fields [1 ])
627- self ._energy_counters .add_pulse_log (
628- address = address ,
629- slot = slot ,
630- timestamp = datetime (
631- year = int (timestamp_energy_log [0 ]),
632- month = int (timestamp_energy_log [1 ]),
633- day = int (timestamp_energy_log [2 ]),
634- hour = int (timestamp_energy_log [3 ]),
635- minute = int (timestamp_energy_log [4 ]),
636- second = int (timestamp_energy_log [5 ]),
637- tzinfo = UTC ,
638- ),
639- pulses = int (log_fields [3 ]),
640- import_only = True ,
627+ pulses = int (log_fields [3 ])
628+ timestamp = datetime (
629+ year = int (timestamp_energy_log [0 ]),
630+ month = int (timestamp_energy_log [1 ]),
631+ day = int (timestamp_energy_log [2 ]),
632+ hour = int (timestamp_energy_log [3 ]),
633+ minute = int (timestamp_energy_log [4 ]),
634+ second = int (timestamp_energy_log [5 ]),
635+ tzinfo = UTC ,
641636 )
642637 if restored_logs .get (address ) is None :
643- restored_logs [address ] = []
644- restored_logs [address ].append (slot )
638+ restored_logs [address ] = {}
639+ restored_logs [address ][slot ] = (timestamp , pulses )
640+
641+ # Sort and prune the records loaded from cache
642+ sorted_logs : dict [int , dict [int , tuple [datetime , int ]]] = {}
643+ skip_before = datetime .now (tz = UTC ) - timedelta (hours = DAY_IN_HOURS )
644+ sorted_addresses = sorted (restored_logs .keys (), reverse = True )
645+ for address in sorted_addresses :
646+ sorted_slots = sorted (restored_logs [address ].keys (), reverse = True )
647+ for slot in sorted_slots :
648+ if restored_logs [address ][slot ][0 ] > skip_before :
649+ if sorted_logs .get (address ) is None :
650+ sorted_logs [address ] = {}
651+ sorted_logs [address ][slot ] = restored_logs [address ][slot ]
652+
653+ for address , data in sorted_logs .items ():
654+ for slot , pulse_data in data .items ():
655+ self ._energy_counters .add_pulse_log (
656+ address = address ,
657+ slot = slot ,
658+ pulses = pulse_data [1 ],
659+ timestamp = pulse_data [0 ],
660+ import_only = True ,
661+ )
645662
646663 self ._energy_counters .update ()
647664
0 commit comments