55from asyncio import Task , create_task
66from collections .abc import Awaitable , Callable
77from dataclasses import replace
8- from datetime import UTC , datetime
8+ from datetime import UTC , datetime , timedelta
99from functools import wraps
1010import logging
1111from math import floor
@@ -603,14 +603,14 @@ def _check_timestamp_is_recent(
603603 return False
604604 return True
605605
606- async def _energy_log_records_load_from_cache (self ) -> bool :
606+ async def _energy_log_records_load_from_cache (self ) -> bool : # noqa: PLR0912
607607 """Load energy_log_record from cache."""
608608 if (cache_data := self ._get_cache (CACHE_ENERGY_COLLECTION )) is None :
609609 _LOGGER .warning (
610610 "Failed to restore energy log records from cache for node %s" , self .name
611611 )
612612 return False
613- restored_logs : dict [int , list [int ]] = {}
613+ restored_logs : dict [int , dict [int , tuple [ datetime , int ] ]] = {}
614614 if cache_data == "" :
615615 _LOGGER .debug ("Cache-record is empty" )
616616 return False
@@ -623,24 +623,41 @@ async def _energy_log_records_load_from_cache(self) -> bool:
623623 if len (timestamp_energy_log ) == 6 :
624624 address = int (log_fields [0 ])
625625 slot = int (log_fields [1 ])
626- self ._energy_counters .add_pulse_log (
627- address = address ,
628- slot = slot ,
629- timestamp = datetime (
630- year = int (timestamp_energy_log [0 ]),
631- month = int (timestamp_energy_log [1 ]),
632- day = int (timestamp_energy_log [2 ]),
633- hour = int (timestamp_energy_log [3 ]),
634- minute = int (timestamp_energy_log [4 ]),
635- second = int (timestamp_energy_log [5 ]),
636- tzinfo = UTC ,
637- ),
638- pulses = int (log_fields [3 ]),
639- import_only = True ,
626+ pulses = int (log_fields [3 ])
627+ timestamp = datetime (
628+ year = int (timestamp_energy_log [0 ]),
629+ month = int (timestamp_energy_log [1 ]),
630+ day = int (timestamp_energy_log [2 ]),
631+ hour = int (timestamp_energy_log [3 ]),
632+ minute = int (timestamp_energy_log [4 ]),
633+ second = int (timestamp_energy_log [5 ]),
634+ tzinfo = UTC ,
640635 )
641636 if restored_logs .get (address ) is None :
642- restored_logs [address ] = []
643- restored_logs [address ].append (slot )
637+ restored_logs [address ] = {}
638+ restored_logs [address ][slot ] = (timestamp , pulses )
639+
640+ # Sort and prune the records loaded from cache
641+ sorted_logs : dict [int , dict [int , tuple [datetime , int ]]] = {}
642+ skip_before = datetime .now (tz = UTC ) - timedelta (hours = DAY_IN_HOURS )
643+ sorted_addresses = sorted (restored_logs .keys (), reverse = True )
644+ for address in sorted_addresses :
645+ sorted_slots = sorted (restored_logs [address ].keys (), reverse = True )
646+ for slot in sorted_slots :
647+ if restored_logs [address ][slot ][0 ] > skip_before :
648+ if sorted_logs .get (address ) is None :
649+ sorted_logs [address ] = {}
650+ sorted_logs [address ][slot ] = restored_logs [address ][slot ]
651+
652+ for address , data in sorted_logs .items ():
653+ for slot , pulse_data in data .items ():
654+ self ._energy_counters .add_pulse_log (
655+ address = address ,
656+ slot = slot ,
657+ pulses = pulse_data [1 ],
658+ timestamp = pulse_data [0 ],
659+ import_only = True ,
660+ )
644661
645662 self ._energy_counters .update ()
646663
0 commit comments