11import asyncio
22from collections import defaultdict
3+ from functools import wraps
34from uuid import uuid4
45
5- from fastapi_websocket_pubsub import ALL_TOPICS
66from loguru import logger
77from opal_client .data .updater import DataUpdater
8- from opal_common .schemas .data import DataUpdate
9- from tenacity import retry , wait_fixed , stop_after_delay
8+ from opal_common .schemas .data import DataUpdate , DataUpdateReport
109
1110from horizon .config import sidecar_config
1211
1312
1413class DataUpdateSubscriber :
1514 def __init__ (self , updater : DataUpdater ):
1615 self ._updater = updater
16+ self ._updater ._should_send_reports = True
1717 self ._notifier_id = uuid4 ().hex
1818 self ._update_listeners : dict [str , asyncio .Event ] = defaultdict (asyncio .Event )
19- self ._register_callbacks ()
19+ self ._inject_subscriber ()
2020
21- # Sometimes a request is sent before the client is created, so we retry the registration
22- @retry (wait = wait_fixed (1 ), stop = stop_after_delay (10 ), reraise = True )
23- def _register_callbacks (self ) -> None :
24- """
25- Register the on_message callback for incoming messages from all topics subscribed by the PubSub client.
26- """
27- callbacks = self ._updater ._client ._callbacks # noqa
28- if ALL_TOPICS not in callbacks :
29- callbacks [ALL_TOPICS ] = []
30- callbacks [ALL_TOPICS ].append (self ._on_message )
21+ def _inject_subscriber (self ):
22+ reporter = self ._updater .callbacks_reporter
23+ reporter .report_update_results = self ._reports_callback_decorator (
24+ reporter .report_update_results
25+ )
3126
32- async def _on_message (self , topic : str = "" , data : dict | None = None ) -> None :
33- """
34- Callback for incoming messages from the PubSub client.
35- """
36- if data is None :
37- logger .debug (f"Received message on topic { topic !r} without data" )
38- return
27+ def _reports_callback_decorator (self , func ):
28+ @wraps (func )
29+ async def wrapper (report : DataUpdateReport , * args , ** kwargs ):
30+ if report .update_id is not None :
31+ self ._resolve_listeners (report .update_id )
32+ else :
33+ logger .debug ("Received report without update ID" )
34+ return await func (report , * args , ** kwargs )
3935
40- update_id = data .get ("id" )
41- if update_id is None :
42- logger .debug (
43- f"Received message on topic { topic !r} without an update ID: { data } "
44- )
45- return
36+ return wrapper
4637
38+ def _resolve_listeners (self , update_id : str ) -> None :
4739 event = self ._update_listeners .get (update_id )
4840 if event is not None :
4941 logger .debug (
50- f"Received message on topic { topic !r } with update ID { update_id !r} , resolving listener(s)"
42+ f"Received acknowledgment for update ID { update_id !r} , resolving listener(s)"
5143 )
5244 event .set ()
5345 else :
5446 logger .debug (
55- f"Received message on topic { topic !r } with update ID { update_id !r} , but no listener found"
47+ f"Received acknowledgment for update ID { update_id !r} , but no listener found"
5648 )
5749
5850 async def wait_for_message (
@@ -71,7 +63,6 @@ async def wait_for_message(
7163 event .wait (),
7264 timeout = timeout ,
7365 )
74- await asyncio .sleep (sidecar_config .LOCAL_FACT_POST_ACK_SLEEP_S )
7566 return True
7667 except asyncio .TimeoutError :
7768 logger .warning (f"Timeout waiting for update id={ update_id !r} " )
0 commit comments