Skip to content

Commit 908a0bd

Browse files
committed
refactor method to use a queue
1 parent 6fa4e2e commit 908a0bd

File tree

1 file changed

+18
-13
lines changed
  • merino/providers/suggest/finance/backends/polygon

1 file changed

+18
-13
lines changed

merino/providers/suggest/finance/backends/polygon/backend.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class PolygonBackend(FinanceBackend):
9494
url_single_ticker_snapshot: str
9595
url_single_ticker_overview: str
9696
filemanager: PolygonFilemanager
97+
cache_refresh_task: asyncio.Task
98+
cache_refresh_task_queue: asyncio.Queue
9799

98100
def __init__(
99101
self,
@@ -133,6 +135,9 @@ def __init__(
133135
SCRIPT_ID_BULK_WRITE_TICKERS, LUA_SCRIPT_CACHE_BULK_WRITE_TICKERS
134136
)
135137

138+
self.cache_refresh_task_queue = asyncio.Queue()
139+
self.cache_refresh_task = asyncio.create_task(self.refresh_ticker_cache_entries())
140+
136141
async def get_snapshots(self, tickers: list[str]) -> list[TickerSnapshot]:
137142
"""Get snapshots for the list of tickers."""
138143
snapshots: list[TickerSnapshot] = []
@@ -368,26 +373,26 @@ async def store_snapshots_in_cache(self, snapshots: list[TickerSnapshot]) -> Non
368373
)
369374

370375
async def refresh_ticker_cache_entries(
371-
self, tickers: list[str], *, await_store: bool = False
376+
self
372377
) -> None:
373378
"""Refresh ticker snapshot in cache. Fetches new snapshot from upstream API and
374379
fires a background task to write it to the cache.
375-
376-
Note: Only awaits the cache write process if await_store is true. Used only for testing.
377380
"""
378-
snapshots = await self.get_snapshots(tickers)
381+
while True:
382+
# Get the tickers list from the queue
383+
# NOTE: get_nowait() can throw QueueEmpty exception
384+
tickers: list[str] = self.cache_refresh_task_queue.get_nowait()
385+
snapshots = await self.get_snapshots(tickers)
379386

380-
# early exit if no snapshots returned.
381-
if len(snapshots) == 0:
382-
return
387+
# Early exit if no snapshots returned.
388+
# Although, the store method also has the same check.
389+
if len(snapshots) == 0:
390+
return
383391

384-
# this parameter is only used for testing.
385-
if await_store:
386392
await self.store_snapshots_in_cache(snapshots)
387-
else:
388-
task = asyncio.create_task(self.store_snapshots_in_cache(snapshots))
389-
# consume/log
390-
task.add_done_callback(lambda t: t.exception())
393+
394+
# notify queue that the task is done
395+
self.cache_refresh_task_queue.task_done()
391396

392397
# TODO @herraj add unit tests for this
393398
def _parse_cached_data(

0 commit comments

Comments
 (0)