66import re
77import time
88from contextlib import suppress
9+ from random import randint
910from threading import RLock , Thread
1011from typing import List , Optional , Union
1112from warnings import warn
@@ -371,22 +372,21 @@ def _check_retry_after(
371372
372373 async def _keepalive_task (self ) -> None :
373374 retry_after = 0
375+ response : ClientResponse | None = None
374376 if self ._timeout_task is not None :
375377 task_name = self ._timeout_task .get_name ()
376378 else :
377379 task_name = f"{ KEEPALIVE_TASK_NAME } - possible internal error"
378380 LOG .debug ("creating %s" , task_name )
379- response = None
380381 with self ._attribute_lock :
381382 if self ._authenticated is None :
382383 raise RuntimeError (
383384 "Keepalive task is running without an authenticated event"
384385 )
385386 while self ._authenticated .is_set ():
386387 relogin_interval = self .relogin_interval * 60
387- if (
388- relogin_interval != 0
389- and time .time () - self ._last_login_time > relogin_interval
388+ if relogin_interval != 0 and time .time () - self ._last_login_time > randint (
389+ int (0.75 * relogin_interval ), relogin_interval
390390 ):
391391 LOG .info ("Login timeout reached, re-logging in" )
392392 # FIXME?: should we just pause the task?
@@ -396,13 +396,9 @@ async def _keepalive_task(self) -> None:
396396 with suppress (Exception ):
397397 await self ._sync_task
398398 await self ._do_logout_query ()
399- response = await self ._do_login_query ()
400- if response is None :
401- LOG .error (
402- "%s could not re-login to ADT Pulse, exiting..." , task_name
403- )
399+ if not await self .async_quick_relogin ():
400+ LOG .error ("%s could not re-login, exiting" , task_name )
404401 return
405- close_response (response )
406402 if self ._sync_task is not None :
407403 coro = self ._sync_check_task ()
408404 self ._sync_task = asyncio .create_task (
@@ -509,6 +505,27 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]:
509505 """
510506 return self ._pulse_connection .loop
511507
508+ async def async_quick_relogin (self ) -> bool :
509+ """Quickly re-login to Pulse.
510+
511+ Doesn't do device queries or set connected event unless a failure occurs.
512+ FIXME: Should probably just re-work login logic."""
513+ response = await self ._do_login_query ()
514+ if not handle_response (response , logging .ERROR , "Could not re-login to Pulse" ):
515+ await self .async_logout ()
516+ return False
517+ return True
518+
519+ def quick_relogin (self ) -> bool :
520+ """Perform quick_relogin synchronously."""
521+ coro = self .async_quick_relogin ()
522+ return asyncio .run_coroutine_threadsafe (
523+ coro ,
524+ self ._pulse_connection .check_sync (
525+ "Attempting to do call sync quick re-login from async"
526+ ),
527+ ).result ()
528+
512529 async def _do_login_query (self , timeout : int = 30 ) -> ClientResponse | None :
513530 try :
514531 retval = await self ._pulse_connection .async_query (
@@ -635,9 +652,9 @@ async def async_logout(self) -> None:
635652
636653 def logout (self ) -> None :
637654 """Log out of ADT Pulse."""
638- loop = self ._pulse_connection .loop
639- if loop is None :
640- raise RuntimeError ( "Attempting to call sync logout without sync login" )
655+ loop = self ._pulse_connection .check_sync (
656+ "Attempting to call sync logout without sync login"
657+ )
641658 sync_thread = self ._session_thread
642659
643660 coro = self .async_logout ()
@@ -655,14 +672,17 @@ async def _sync_check_task(self) -> None:
655672 LOG .debug ("creating %s" , task_name )
656673 response = None
657674 retry_after = 0
675+ last_sync_text = "0-0-0"
658676 if self ._updates_exist is None :
659677 raise RuntimeError (f"{ task_name } started without update event initialized" )
660- have_update = False
678+ have_updates = False
661679 while True :
662680 try :
663- pi = self .site .gateway .poll_interval
664- if have_update :
665- pi = pi / 2.0
681+ self .site .gateway .adjust_backoff_poll_interval ()
682+ if not have_updates :
683+ pi = self .site .gateway .poll_interval
684+ else :
685+ pi = 0.0
666686 if retry_after == 0 :
667687 await asyncio .sleep (pi )
668688 else :
@@ -684,35 +704,32 @@ async def _sync_check_task(self) -> None:
684704 ):
685705 close_response (response )
686706 continue
687-
707+ close_response ( response )
688708 pattern = r"\d+[-]\d+[-]\d+"
689709 if not re .match (pattern , text ):
690710 LOG .warning (
691711 "Unexpected sync check format (%s), forcing re-auth" , pattern
692712 )
693713 LOG .debug ("Received %s from ADT Pulse site" , text )
694- close_response (response )
695714 await self ._do_logout_query ()
696- await self .async_login ()
715+ if not await self .async_quick_relogin ():
716+ LOG .error ("%s couldn't re-login, exiting." , task_name )
697717 continue
698-
699- # we can have 0-0-0 followed by 1-0-0 followed by 2-0-0, etc
700- # wait until these settle
701- if text .endswith ("-0-0" ):
702- LOG .debug (
703- "Sync token %s indicates updates may exist, requerying" , text
704- )
705- close_response (response )
706- have_update = True
718+ if text != last_sync_text :
719+ LOG .debug ("Updates exist: %s, requerying" , text )
720+ last_sync_text = text
721+ have_updates = True
707722 continue
708- if have_update :
709- have_update = False
723+ if have_updates :
724+ have_updates = False
710725 if await self .async_update () is False :
711726 LOG .debug ("Pulse data update from %s failed" , task_name )
712727 continue
713728 self ._updates_exist .set ()
714- LOG .debug ("Sync token %s indicates no remote updates to process" , text )
715- close_response (response )
729+ else :
730+ LOG .debug (
731+ "Sync token %s indicates no remote updates to process" , text
732+ )
716733
717734 except asyncio .CancelledError :
718735 LOG .debug ("%s cancelled" , task_name )
@@ -802,11 +819,13 @@ def update(self) -> bool:
802819 Returns:
803820 bool: True on success
804821 """
805- loop = self ._pulse_connection .loop
806- if loop is None :
807- raise RuntimeError ("Attempting to run sync update from async login" )
808822 coro = self .async_update ()
809- return asyncio .run_coroutine_threadsafe (coro , loop ).result ()
823+ return asyncio .run_coroutine_threadsafe (
824+ coro ,
825+ self ._pulse_connection .check_sync (
826+ "Attempting to run sync update from async login"
827+ ),
828+ ).result ()
810829
811830 @property
812831 def sites (self ) -> List [ADTPulseSite ]:
0 commit comments