Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions base_cacheable_class/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@
except ImportError:
# Redis is optional
pass

try:
from .cache.valkey import ValkeyCache, ValkeyCacheDecorator, ValkeyClientConfig

__all__.extend(["ValkeyCache", "ValkeyCacheDecorator", "ValkeyClientConfig"])
except ImportError:
# Valkey is optional
pass
48 changes: 48 additions & 0 deletions base_cacheable_class/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from .in_memory.cache import InMemoryCache
from .in_memory.decorator import InMemoryCacheDecorator
from .redis.cache import RedisCache
from .redis.decorator import RedisCacheDecorator
from .valkey.cache import ValkeyCache
from .valkey.config import ValkeyClientConfig
from .valkey.decorator import ValkeyCacheDecorator


class AsyncCacheDecoratorFactory:
@classmethod
async def inmemory(cls, default_ttl: int = 60) -> InMemoryCacheDecorator:
cache = InMemoryCache()
return InMemoryCacheDecorator(cache, default_ttl)

@classmethod
async def redis(
cls,
host: str = "localhost",
port: int = 6379,
password: str = "yourpassword",
username: str = "yourusername",
db: int = 0,
socket_timeout: float = 0.5,
socket_connect_timeout: float = 0.5,
default_ttl: int = 60,
) -> RedisCacheDecorator:
cache = RedisCache(host, port, password, username, db, socket_timeout, socket_connect_timeout)
return RedisCacheDecorator(cache, default_ttl)

@classmethod
async def valkey(cls, config: ValkeyClientConfig | None = None, default_ttl: int = 60) -> ValkeyCacheDecorator:
if config is None:
config = ValkeyClientConfig.localhost()
cache = await ValkeyCache.create(config)
return ValkeyCacheDecorator(cache, default_ttl)

@classmethod
async def from_inmemory_cache(cls, cache: InMemoryCache, default_ttl: int = 60) -> InMemoryCacheDecorator:
return InMemoryCacheDecorator(cache, default_ttl)

@classmethod
async def from_redis_cache(cls, cache: RedisCache, default_ttl: int = 60) -> RedisCacheDecorator:
return RedisCacheDecorator(cache, default_ttl)

@classmethod
async def from_valkey_cache(cls, cache: ValkeyCache, default_ttl: int = 60) -> ValkeyCacheDecorator:
return ValkeyCacheDecorator(cache, default_ttl)
2 changes: 1 addition & 1 deletion base_cacheable_class/cache/redis/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from redis.asyncio import Redis

from base_cacheable_class import CacheInterface
from ...interfaces import CacheInterface


class RedisCache(CacheInterface):
Expand Down
5 changes: 5 additions & 0 deletions base_cacheable_class/cache/valkey/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .cache import ValkeyCache
from .config import ValkeyClientConfig
from .decorator import ValkeyCacheDecorator

__all__ = ["ValkeyCache", "ValkeyCacheDecorator", "ValkeyClientConfig"]
106 changes: 106 additions & 0 deletions base_cacheable_class/cache/valkey/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import pickle
import re
from typing import Any, cast

from glide import ExpirySet, ExpiryType, FlushMode, GlideClient

from ...interfaces.cache import CacheInterface
from .config import ValkeyClientConfig


class ValkeyCache(CacheInterface):
_config: ValkeyClientConfig
_client: GlideClient

# Recommend initialize with create method, not directly
def __init__(self, config: ValkeyClientConfig, client: GlideClient):
self._config = config
self._client = client

@classmethod
async def create(cls, config: ValkeyClientConfig) -> "ValkeyCache":
client = await GlideClient.create(config.to_glide_config())
return cls(config, client)

@property
def config(self) -> ValkeyClientConfig:
return self._config

async def _serialize(self, value: Any) -> bytes:
data = pickle.dumps(value)
return data

async def _deserialize(self, data: bytes | None) -> Any:
if data is None:
return None
data = pickle.loads(data) # noqa: S301
return data
Comment on lines +29 to +37
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think we can serialize with mspack
  2. Can we sperate serial/deserializer layer?

Copy link

@Ilevk Ilevk Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need @sigridjineth's opinion


async def get(self, key: str) -> Any:
data = await self._client.get(key)
deserialized_data = await self._deserialize(data)
if deserialized_data is None:
return None
if isinstance(deserialized_data, bytes):
return deserialized_data.decode("utf-8")
return deserialized_data

async def set(self, key: str, value: Any, ttl: int | None = None) -> None:
serialized_value = await self._serialize(value)
if ttl is not None:
ttl_ms = int(ttl * 1000)
expiry = ExpirySet(expiry_type=ExpiryType.MILLSEC, value=ttl_ms)
await self._client.set(key, serialized_value, expiry=expiry)
else:
await self._client.set(key, serialized_value)

async def exists(self, key: str) -> bool:
return await self._client.exists([key]) == 1

async def delete(self, key: str) -> None:
await self._client.delete([key])

async def clear(self) -> None:
await self._client.flushdb(flush_mode=FlushMode.ASYNC)

async def get_keys(self, pattern: str | None = None) -> list[str]:
if pattern is None:
pattern = "*"

cursor = b"0"
matched_keys = []
while True:
result: list[bytes | list[bytes]] = await self._client.scan(cursor=cursor, match=pattern)
cursor: bytes = cast(bytes, result[0])
keys: list[bytes] = cast(list[bytes], result[1])

decoded_keys = [k.decode() if isinstance(k, bytes) else k for k in keys]
matched_keys.extend(decoded_keys)

if cursor == b"0":
break
return matched_keys

async def get_keys_regex(self, target_func_name: str, pattern: str | None = None) -> list[str]:
cursor = b"0"
all_keys: list[str] = []
while True:
result: list[bytes | list[bytes]] = await self._client.scan(cursor=cursor, match=f"{target_func_name}*")
cursor: bytes = cast(bytes, result[0])
keys: list[bytes] = cast(list[bytes], result[1])

decoded_keys: list[str] = cast(list[str], [k.decode() if isinstance(k, bytes) else k for k in keys])
all_keys.extend(decoded_keys)

if cursor == b"0":
break
if not pattern:
return all_keys

return [k for k in all_keys if re.compile(pattern).search(k)]

async def ping(self) -> None:
await self._client.ping()

async def close(self) -> None:
await self._client.close()
62 changes: 62 additions & 0 deletions base_cacheable_class/cache/valkey/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dataclasses import dataclass, field

from glide import GlideClientConfiguration, NodeAddress


@dataclass
class ValkeyClientConfig:
"""
Simplified configuration class for Valkey client.
Includes only essential settings; others use default values.
"""

host: str = "localhost"
port: int = 6379
database_id: int = 0
use_tls: bool = False
request_timeout_ms: int | None = None
client_name: str | None = None
additional_nodes: list[tuple[str, int]] = field(default_factory=list)

def to_glide_config(self) -> GlideClientConfiguration:
"""
Convert ValkeyClientConfig to GlideClientConfiguration
"""
addresses = [NodeAddress(host=self.host, port=self.port)]

# Include additional nodes if present
for host, port in self.additional_nodes:
addresses.append(NodeAddress(host=host, port=port))

config = GlideClientConfiguration(
addresses=addresses,
use_tls=self.use_tls,
database_id=self.database_id,
)

# Optional settings
if self.request_timeout_ms is not None:
config.request_timeout = self.request_timeout_ms

if self.client_name is not None:
config.client_name = self.client_name

return config

@classmethod
def localhost(cls, port: int = 6379, database_id: int = 0) -> "ValkeyClientConfig":
return cls(host="localhost", port=port, database_id=database_id)

@classmethod
def remote(cls, host: str, port: int = 6379, use_tls: bool = False) -> "ValkeyClientConfig":
return cls(host=host, port=port, use_tls=use_tls)

@classmethod
def cluster(cls, nodes: list[tuple[str, int]], use_tls: bool = False) -> "ValkeyClientConfig":
if not nodes:
raise ValueError("At least one node must be provided for cluster configuration")

primary_host, primary_port = nodes[0]
additional_nodes = nodes[1:] if len(nodes) > 1 else []

return cls(host=primary_host, port=primary_port, additional_nodes=additional_nodes, use_tls=use_tls)
98 changes: 98 additions & 0 deletions base_cacheable_class/cache/valkey/decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging
from collections.abc import Callable
from functools import wraps
from typing import Any

from ...interfaces import CacheDecoratorInterface, CacheInterface

logger = logging.getLogger(__name__)


class ValkeyCacheDecorator(CacheDecoratorInterface):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p3. I'll return to fundamental questions. do we need a decorator implementation for each cache?

I think the sync/async decorator is enough, adding another kind of cache would make the source code duplicate a lot.

for example, redis cache decorator and valkey cache decorator have a similar interface and logic. (mostly same).

def __init__(self, cache: CacheInterface, default_ttl: int = 60):
self.cache = cache
self.default_ttl = default_ttl

def key_builder(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> str:
arg_str = str(args)
kwarg_str = str(kwargs) if kwargs else "{}"
func_name = getattr(func, "__name__", "unknown")
return f"{func_name}:{arg_str}:{kwarg_str}"

def __call__(self, ttl: int | None = None) -> Callable[..., Callable[..., Any]]:
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
_key: str = self.key_builder(func, *args, **kwargs)
current_ttl: int = ttl if ttl is not None else self.default_ttl

try:
cached_value = await self.cache.get(_key)
if cached_value is not None:
return cached_value

result = await func(*args, **kwargs)
if result is not None:
await self.cache.set(_key, result, ttl=current_ttl)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p3. even if result is None, I think it's better to cache. it helps the code not query a source repeatedly.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But redis / valkey don’t allow to set None value. so you should find other way.
In my case, I used a special value to represent None in valkey. e.g) '_NA'

return result

except (ConnectionError, TimeoutError) as e:
logger.warning(f"Redis connection or timeout issue: {e}, falling back.")
return await func(*args, **kwargs)

except Exception as e:
logger.error(f"Error in cache decorator: {e}")
return await func(*args, **kwargs)

return wrapper

return decorator

def invalidate(
self, target_func_name: str, param_mapping: dict[str, str] | None = None
) -> Callable[..., Callable[..., Any]]:
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
pattern = rf"{target_func_name}:\(.*\):{{.*}}"

if param_mapping:
kwargs_patterns: list[str] = [
rf".*'{k}':\s*'{v!s}'"
for k, v in {
t_param: kwargs[s_param]
for t_param, s_param in param_mapping.items()
if s_param in kwargs
}.items()
]
pattern = rf"{target_func_name}:\(.*\):{{" + ".*".join(kwargs_patterns) + ".*}"

cached_keys: list[str] = await self.cache.get_keys_regex(
target_func_name=target_func_name, pattern=pattern
)
for cache_key in cached_keys:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p3. you can use pipeline to send commands at once. but it might need to add interface for pipeline in cache class.

await self.cache.delete(cache_key)

except Exception as e:
logger.error(f"Error in cache invalidation: {e}")

return await func(*args, **kwargs)

return wrapper

return decorator

def invalidate_all(self) -> Callable[..., Callable[..., Any]]:
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
await self.cache.clear()
except Exception as e:
logger.error(f"Error in cache clear: {e}")
return await func(*args, **kwargs)

return wrapper

return decorator
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ classifiers = [
]
dependencies = [
"redis>=6.2.0",
"valkey-glide>=2.0.1",
]
Comment on lines 26 to 29
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make dependency optional


[project.optional-dependencies]
Expand Down
Loading