Skip to content

Commit 4348627

Browse files
committed
refactor method to use a queue
1 parent f8caff2 commit 4348627

File tree

1 file changed

+18
-13
lines changed
  • merino/providers/suggest/finance/backends/polygon

1 file changed

+18
-13
lines changed

merino/providers/suggest/finance/backends/polygon/backend.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ class PolygonBackend:
9696
url_single_ticker_snapshot: str
9797
url_single_ticker_overview: str
9898
filemanager: PolygonFilemanager
99+
cache_refresh_task: asyncio.Task
100+
cache_refresh_task_queue: asyncio.Queue
99101

100102
def __init__(
101103
self,
@@ -137,6 +139,9 @@ def __init__(
137139
SCRIPT_ID_BULK_WRITE_TICKERS, LUA_SCRIPT_CACHE_BULK_WRITE_TICKERS
138140
)
139141

142+
self.cache_refresh_task_queue = asyncio.Queue()
143+
self.cache_refresh_task = asyncio.create_task(self.refresh_ticker_cache_entries())
144+
140145
async def get_snapshots(self, tickers: list[str]) -> list[TickerSnapshot]:
141146
"""Get snapshots for the list of tickers."""
142147
# check the cache first.
@@ -403,26 +408,26 @@ async def store_snapshots_in_cache(self, snapshots: list[TickerSnapshot]) -> Non
403408
)
404409

405410
async def refresh_ticker_cache_entries(
406-
self, tickers: list[str], *, await_store: bool = False
411+
self
407412
) -> None:
408413
"""Refresh ticker snapshot in cache. Fetches new snapshot from upstream API and
409414
fires a background task to write it to the cache.
410-
411-
Note: Only awaits the cache write process if await_store is true. Used only for testing.
412415
"""
413-
snapshots = await self.get_snapshots(tickers)
416+
while True:
417+
# Get the tickers list from the queue
418+
# NOTE: get_nowait() can throw QueueEmpty exception
419+
tickers: list[str] = self.cache_refresh_task_queue.get_nowait()
420+
snapshots = await self.get_snapshots(tickers)
414421

415-
# early exit if no snapshots returned.
416-
if len(snapshots) == 0:
417-
return
422+
# Early exit if no snapshots returned.
423+
# Although, the store method also has the same check.
424+
if len(snapshots) == 0:
425+
return
418426

419-
# this parameter is only used for testing.
420-
if await_store:
421427
await self.store_snapshots_in_cache(snapshots)
422-
else:
423-
task = asyncio.create_task(self.store_snapshots_in_cache(snapshots))
424-
# consume/log
425-
task.add_done_callback(lambda t: t.exception())
428+
429+
# notify queue that the task is done
430+
self.cache_refresh_task_queue.task_done()
426431

427432
# TODO @herraj add unit tests for this
428433
def _parse_cached_data(

0 commit comments

Comments
 (0)