|
1 | 1 | import asyncio |
2 | 2 | from typing import Any |
3 | 3 |
|
| 4 | +from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| 5 | +from apscheduler.triggers.interval import IntervalTrigger |
4 | 6 | from loguru import logger |
5 | 7 |
|
6 | 8 | from app.services.catalog import DynamicCatalogService |
@@ -38,21 +40,28 @@ class BackgroundCatalogUpdater: |
38 | 40 |
|
39 | 41 | def __init__(self, interval_seconds: int) -> None: |
40 | 42 | self.interval_seconds = max(60, interval_seconds) |
41 | | - self._task: asyncio.Task | None = None |
42 | | - self._stop_event = asyncio.Event() |
| 43 | + self.scheduler = AsyncIOScheduler() |
43 | 44 |
|
44 | 45 | def start(self) -> None: |
45 | | - if self._task is not None: |
| 46 | + if self.scheduler.running: |
46 | 47 | return |
47 | | - self._stop_event.clear() |
48 | | - self._task = asyncio.create_task(self._run()) |
| 48 | + |
| 49 | + logger.info(f"Starting background catalog updater. Interval: {self.interval_seconds}s") |
| 50 | + self.scheduler.add_job( |
| 51 | + self.refresh_all_tokens, |
| 52 | + trigger=IntervalTrigger(seconds=self.interval_seconds), |
| 53 | + id="catalog_refresh", |
| 54 | + replace_existing=True, |
| 55 | + max_instances=1, # Prevent new job from starting if previous one is still running |
| 56 | + coalesce=True, # If multiple runs are missed, only run once |
| 57 | + ) |
| 58 | + self.scheduler.start() |
49 | 59 |
|
50 | 60 | async def stop(self) -> None: |
51 | | - if self._task is None: |
52 | | - return |
53 | | - self._stop_event.set() |
54 | | - await self._task |
55 | | - self._task = None |
| 61 | + if self.scheduler.running: |
| 62 | + logger.info("Stopping background catalog updater...") |
| 63 | + self.scheduler.shutdown(wait=True) # Wait for running jobs to complete |
| 64 | + logger.info("Background catalog updater stopped.") |
56 | 65 |
|
57 | 66 | async def refresh_all_tokens(self) -> None: |
58 | 67 | """Refresh catalogs for all tokens concurrently with a semaphore.""" |
@@ -89,18 +98,6 @@ async def _update_safe(key: str, payload: dict[str, Any]) -> None: |
89 | 98 | except Exception as exc: |
90 | 99 | logger.error(f"Catalog refresh scan failed: {exc}", exc_info=True) |
91 | 100 |
|
92 | | - async def _run(self) -> None: |
93 | | - logger.info(f"Background catalog updater started. Interval: {self.interval_seconds}s") |
94 | | - try: |
95 | | - while not self._stop_event.is_set(): |
96 | | - await self.refresh_all_tokens() |
97 | | - try: |
98 | | - await asyncio.wait_for(self._stop_event.wait(), timeout=self.interval_seconds) |
99 | | - except TimeoutError: |
100 | | - continue |
101 | | - finally: |
102 | | - logger.info("Background catalog updater stopped.") |
103 | | - |
104 | 101 | @staticmethod |
105 | 102 | def _has_credentials(payload: dict[str, Any]) -> bool: |
106 | 103 | return bool(payload.get("authKey") or (payload.get("username") and payload.get("password"))) |
|
0 commit comments