From d1689dd42eea302ee1d3aafb543b99e18ded9134 Mon Sep 17 00:00:00 2001 From: andrewzhao <55998423+yaozcoderepo@users.noreply.github.com> Date: Thu, 6 Nov 2025 21:34:24 -0500 Subject: [PATCH] Make LocalRedis asynchronous by adding asyncio.sleep(0) --- oxygent/databases/db_redis/local_redis.py | 38 +++++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/oxygent/databases/db_redis/local_redis.py b/oxygent/databases/db_redis/local_redis.py index be80253..dc644ea 100644 --- a/oxygent/databases/db_redis/local_redis.py +++ b/oxygent/databases/db_redis/local_redis.py @@ -5,10 +5,11 @@ requiring an actual Elasticsearch server. """ +import asyncio import json import time from collections import deque -from typing import Dict, Union +from typing import Dict, Optional, Union from ...config import Config @@ -27,20 +28,22 @@ class LocalRedis: - Value type validation and conversion """ - def __init__(self): + def __init__(self, *, yield_on_ops: bool = True): self.data: Dict[str, deque] = {} self.expiry: Dict[str, float] = {} self.default_expire_time = Config.get_redis_expire_time() self.default_list_max_size = Config.get_redis_max_size() self.default_list_max_length = Config.get_redis_max_length() * 1024 + # When True, each mutating/read pop yields the event loop once for fairness. + self._yield_on_ops = yield_on_ops async def lpush( self, key: str, *values: Union[bytes, int, str, float, dict], - ex: int = None, - max_size: int = None, - max_length: int = None, + ex: Optional[int] = None, + max_size: Optional[int] = None, + max_length: Optional[int] = None, ) -> int: """Push one or more values to the left (head) of a list. @@ -96,6 +99,10 @@ async def lpush( reversed(new_values) ) # Use reserved to ensure proper order self.expiry[key] = time.time() + ex + + if self._yield_on_ops: + await asyncio.sleep(0) + return len(self.data[key]) async def rpop(self, key: str) -> Union[str, bytes, int, float, None]: @@ -116,7 +123,17 @@ async def rpop(self, key: str) -> Union[str, bytes, int, float, None]: """ self._check_expiry(key) if key in self.data and self.data[key]: - return self.data[key].pop() + item = self.data[key].pop() + + # Yield after a successful pop so producers/other tasks get a turn too + if self._yield_on_ops: + await asyncio.sleep(0) + + return item + + # Optional tiny yield even on empty pops helps polling loops be nicer + if self._yield_on_ops: + await asyncio.sleep(0) return None def _check_expiry(self, key: str): @@ -129,7 +146,8 @@ def _check_expiry(self, key: str): del self.data[key] del self.expiry[key] - async def close( - self, - ): # This method is async to maintain compatibility with the Redis interface - pass + async def close(self): + # This method is async to maintain compatibility with the Redis interface + # Async for interface compatibility + if self._yield_on_ops: + await asyncio.sleep(0)