Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions oxygent/databases/db_redis/local_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
requiring an actual Elasticsearch server.
"""

import asyncio
import json
import time
from collections import deque
from typing import Dict, Union
from typing import Dict, Optional, Union

from ...config import Config

Expand All @@ -27,20 +28,22 @@ class LocalRedis:
- Value type validation and conversion
"""

def __init__(self):
def __init__(self, *, yield_on_ops: bool = True):
self.data: Dict[str, deque] = {}
self.expiry: Dict[str, float] = {}
self.default_expire_time = Config.get_redis_expire_time()
self.default_list_max_size = Config.get_redis_max_size()
self.default_list_max_length = Config.get_redis_max_length() * 1024
# When True, each mutating/read pop yields the event loop once for fairness.
self._yield_on_ops = yield_on_ops

async def lpush(
self,
key: str,
*values: Union[bytes, int, str, float, dict],
ex: int = None,
max_size: int = None,
max_length: int = None,
ex: Optional[int] = None,
max_size: Optional[int] = None,
max_length: Optional[int] = None,
) -> int:
"""Push one or more values to the left (head) of a list.

Expand Down Expand Up @@ -96,6 +99,10 @@ async def lpush(
reversed(new_values)
) # Use reserved to ensure proper order
self.expiry[key] = time.time() + ex

if self._yield_on_ops:
await asyncio.sleep(0)

return len(self.data[key])

async def rpop(self, key: str) -> Union[str, bytes, int, float, None]:
Expand All @@ -116,7 +123,17 @@ async def rpop(self, key: str) -> Union[str, bytes, int, float, None]:
"""
self._check_expiry(key)
if key in self.data and self.data[key]:
return self.data[key].pop()
item = self.data[key].pop()

# Yield after a successful pop so producers/other tasks get a turn too
if self._yield_on_ops:
await asyncio.sleep(0)

return item

# Optional tiny yield even on empty pops helps polling loops be nicer
if self._yield_on_ops:
await asyncio.sleep(0)
return None

def _check_expiry(self, key: str):
Expand All @@ -129,7 +146,8 @@ def _check_expiry(self, key: str):
del self.data[key]
del self.expiry[key]

async def close(
self,
): # This method is async to maintain compatibility with the Redis interface
pass
async def close(self):
# This method is async to maintain compatibility with the Redis interface
# Async for interface compatibility
if self._yield_on_ops:
await asyncio.sleep(0)