11from asyncio import gather
2- from collections import defaultdict
2+ from collections import defaultdict , deque
33from contextlib import asynccontextmanager
4+ from queue import PriorityQueue
45from typing import Any , AsyncGenerator
56
67import backoff
3738
3839context : BrowserContext
3940dynamic_subs : dict [str , set [Subscription ]] = defaultdict (set )
40- cache = FIFOCache (100 )
4141client = AsyncClient (
4242 headers = {
4343 "user-agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
4848)
4949
5050
51+ class Cache :
52+ data : set [str ]
53+ queue : PriorityQueue [str ]
54+
55+ def __init__ (self ) -> None :
56+ self .data = set ()
57+ self .queue = PriorityQueue ()
58+
59+ def push (self , item : str ) -> None :
60+ if item in self .data :
61+ return
62+ self .queue .put_nowait (item )
63+ self .data .add (item )
64+
65+ def pop (self ) -> str :
66+ item = self .queue .get_nowait ()
67+ self .data .remove (item )
68+ return item
69+
70+ def replace (self , item : str ) -> str | None :
71+ if item < self .queue .queue [0 ] or item in self .data :
72+ return
73+ self .push (item )
74+ return self .pop ()
75+
76+
77+ cache = Cache ()
78+
79+
5180@driver .on_startup
5281async def _ () -> None :
5382 global context
@@ -74,7 +103,7 @@ async def _() -> None:
74103
75104 for page in range (4 , - 1 , - 1 ):
76105 for item in (await get_dynamics (page ))["items" ]:
77- cache [ item ["id_str" ]] = None
106+ cache . push ( item ["id_str" ])
78107
79108 async with get_session () as session :
80109 for sub in await session .scalars (select (Subscription )):
@@ -88,8 +117,7 @@ async def _() -> None:
88117
89118 dynamics = []
90119 for dynamic in (await get_dynamics ())["items" ]:
91- if dynamic ["id_str" ] not in cache :
92- cache [dynamic ["id_str" ]] = None
120+ if cache .replace (dynamic ["id_str" ]):
93121 dynamics .append (dynamic )
94122
95123 run_task (
0 commit comments