From 53de3086f7a2176dea1e804676d2eb7f618c41cf Mon Sep 17 00:00:00 2001 From: dvora-h <67596500+dvora-h@users.noreply.github.com> Date: Mon, 1 Jan 2024 12:13:04 +0200 Subject: [PATCH 001/196] Client side caching invalidations (standalone) (#3089) * cache invalidations * isort * deamon thread * remove threads * delete comment * tests * skip if hiredis available * async * review comments * docstring * decode test * fix test * fix decode response test --- redis/__init__.py | 2 - redis/_parsers/resp3.py | 66 ++++++++++------ redis/asyncio/client.py | 131 ++++++++++++++++++++++++++----- redis/asyncio/connection.py | 4 + redis/cache.py | 18 +++-- redis/client.py | 37 +++++++-- redis/cluster.py | 2 +- redis/connection.py | 6 ++ tests/test_asyncio/test_cache.py | 129 ++++++++++++++++++++++++++++++ tests/test_cache.py | 119 ++++++++++++++++++++++++++++ 10 files changed, 457 insertions(+), 57 deletions(-) create mode 100644 tests/test_asyncio/test_cache.py create mode 100644 tests/test_cache.py diff --git a/redis/__init__.py b/redis/__init__.py index 7bf6839453..495d2d99bb 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -2,7 +2,6 @@ from redis import asyncio # noqa from redis.backoff import default_backoff -from redis.cache import _LocalChace from redis.client import Redis, StrictRedis from redis.cluster import RedisCluster from redis.connection import ( @@ -62,7 +61,6 @@ def int_or_str(value): VERSION = tuple([99, 99, 99]) __all__ = [ - "_LocalChace", "AuthenticationError", "AuthenticationWrongNumberOfArgsError", "BlockingConnectionPool", diff --git a/redis/_parsers/resp3.py b/redis/_parsers/resp3.py index 569e7ee679..13aa1ffccb 100644 --- a/redis/_parsers/resp3.py +++ b/redis/_parsers/resp3.py @@ -6,15 +6,18 @@ from .base import _AsyncRESPBase, _RESPBase from .socket import SERVER_CLOSED_CONNECTION_ERROR +_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"] + class _RESP3Parser(_RESPBase): """RESP3 protocol implementation""" def __init__(self, socket_read_size): super().__init__(socket_read_size) - self.push_handler_func = self.handle_push_response + self.pubsub_push_handler_func = self.handle_pubsub_push_response + self.invalidations_push_handler_func = None - def handle_push_response(self, response): + def handle_pubsub_push_response(self, response): logger = getLogger("push_response") logger.info("Push response: " + str(response)) return response @@ -114,13 +117,7 @@ def _read_response(self, disable_decoding=False, push_request=False): ) for _ in range(int(response)) ] - res = self.push_handler_func(response) - if not push_request: - return self._read_response( - disable_decoding=disable_decoding, push_request=push_request - ) - else: - return res + self.handle_push_response(response, disable_decoding, push_request) else: raise InvalidResponse(f"Protocol Error: {raw!r}") @@ -128,16 +125,32 @@ def _read_response(self, disable_decoding=False, push_request=False): response = self.encoder.decode(response) return response - def set_push_handler(self, push_handler_func): - self.push_handler_func = push_handler_func + def handle_push_response(self, response, disable_decoding, push_request): + if response[0] in _INVALIDATION_MESSAGE: + res = self.invalidation_push_handler_func(response) + else: + res = self.pubsub_push_handler_func(response) + if not push_request: + return self._read_response( + disable_decoding=disable_decoding, push_request=push_request + ) + else: + return res + + def set_pubsub_push_handler(self, pubsub_push_handler_func): + self.pubsub_push_handler_func = pubsub_push_handler_func + + def set_invalidation_push_handler(self, invalidations_push_handler_func): + self.invalidation_push_handler_func = invalidations_push_handler_func class _AsyncRESP3Parser(_AsyncRESPBase): def __init__(self, socket_read_size): super().__init__(socket_read_size) - self.push_handler_func = self.handle_push_response + self.pubsub_push_handler_func = self.handle_pubsub_push_response + self.invalidations_push_handler_func = None - def handle_push_response(self, response): + def handle_pubsub_push_response(self, response): logger = getLogger("push_response") logger.info("Push response: " + str(response)) return response @@ -246,13 +259,7 @@ async def _read_response( ) for _ in range(int(response)) ] - res = self.push_handler_func(response) - if not push_request: - return await self._read_response( - disable_decoding=disable_decoding, push_request=push_request - ) - else: - return res + await self.handle_push_response(response, disable_decoding, push_request) else: raise InvalidResponse(f"Protocol Error: {raw!r}") @@ -260,5 +267,20 @@ async def _read_response( response = self.encoder.decode(response) return response - def set_push_handler(self, push_handler_func): - self.push_handler_func = push_handler_func + async def handle_push_response(self, response, disable_decoding, push_request): + if response[0] in _INVALIDATION_MESSAGE: + res = self.invalidation_push_handler_func(response) + else: + res = self.pubsub_push_handler_func(response) + if not push_request: + return await self._read_response( + disable_decoding=disable_decoding, push_request=push_request + ) + else: + return res + + def set_pubsub_push_handler(self, pubsub_push_handler_func): + self.pubsub_push_handler_func = pubsub_push_handler_func + + def set_invalidation_push_handler(self, invalidations_push_handler_func): + self.invalidation_push_handler_func = invalidations_push_handler_func diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 9e0491f810..eea9612f4a 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -37,6 +37,12 @@ ) from redis.asyncio.lock import Lock from redis.asyncio.retry import Retry +from redis.cache import ( + DEFAULT_BLACKLIST, + DEFAULT_EVICTION_POLICY, + DEFAULT_WHITELIST, + _LocalCache, +) from redis.client import ( EMPTY_RESPONSE, NEVER_DECODE, @@ -60,7 +66,7 @@ TimeoutError, WatchError, ) -from redis.typing import ChannelT, EncodableT, KeyT +from redis.typing import ChannelT, EncodableT, KeysT, KeyT, ResponseT from redis.utils import ( HIREDIS_AVAILABLE, _set_info_logger, @@ -231,6 +237,13 @@ def __init__( redis_connect_func=None, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, + cache_enable: bool = False, + client_cache: Optional[_LocalCache] = None, + cache_max_size: int = 100, + cache_ttl: int = 0, + cache_eviction_policy: str = DEFAULT_EVICTION_POLICY, + cache_blacklist: List[str] = DEFAULT_BLACKLIST, + cache_whitelist: List[str] = DEFAULT_WHITELIST, ): """ Initialize a new Redis client. @@ -336,6 +349,16 @@ def __init__( # on a set of redis commands self._single_conn_lock = asyncio.Lock() + self.client_cache = client_cache + if cache_enable: + self.client_cache = _LocalCache( + cache_max_size, cache_ttl, cache_eviction_policy + ) + if self.client_cache is not None: + self.cache_blacklist = cache_blacklist + self.cache_whitelist = cache_whitelist + self.client_cache_initialized = False + def __repr__(self): return ( f"<{self.__class__.__module__}.{self.__class__.__name__}" @@ -350,6 +373,10 @@ async def initialize(self: _RedisT) -> _RedisT: async with self._single_conn_lock: if self.connection is None: self.connection = await self.connection_pool.get_connection("_") + if self.client_cache is not None: + self.connection._parser.set_invalidation_push_handler( + self._cache_invalidation_process + ) return self def set_response_callback(self, command: str, callback: ResponseCallbackT): @@ -568,6 +595,8 @@ async def aclose(self, close_connection_pool: Optional[bool] = None) -> None: close_connection_pool is None and self.auto_close_connection_pool ): await self.connection_pool.disconnect() + if self.client_cache: + self.client_cache.flush() @deprecated_function(version="5.0.1", reason="Use aclose() instead", name="close") async def close(self, close_connection_pool: Optional[bool] = None) -> None: @@ -596,29 +625,95 @@ async def _disconnect_raise(self, conn: Connection, error: Exception): ): raise error + def _cache_invalidation_process( + self, data: List[Union[str, Optional[List[str]]]] + ) -> None: + """ + Invalidate (delete) all redis commands associated with a specific key. + `data` is a list of strings, where the first string is the invalidation message + and the second string is the list of keys to invalidate. + (if the list of keys is None, then all keys are invalidated) + """ + if data[1] is not None: + for key in data[1]: + self.client_cache.invalidate(str_if_bytes(key)) + else: + self.client_cache.flush() + + async def _get_from_local_cache(self, command: str): + """ + If the command is in the local cache, return the response + """ + if ( + self.client_cache is None + or command[0] in self.cache_blacklist + or command[0] not in self.cache_whitelist + ): + return None + while not self.connection._is_socket_empty(): + await self.connection.read_response(push_request=True) + return self.client_cache.get(command) + + def _add_to_local_cache( + self, command: Tuple[str], response: ResponseT, keys: List[KeysT] + ): + """ + Add the command and response to the local cache if the command + is allowed to be cached + """ + if ( + self.client_cache is not None + and (self.cache_blacklist == [] or command[0] not in self.cache_blacklist) + and (self.cache_whitelist == [] or command[0] in self.cache_whitelist) + ): + self.client_cache.set(command, response, keys) + + def delete_from_local_cache(self, command: str): + """ + Delete the command from the local cache + """ + try: + self.client_cache.delete(command) + except AttributeError: + pass + # COMMAND EXECUTION AND PROTOCOL PARSING async def execute_command(self, *args, **options): """Execute a command and return a parsed response""" await self.initialize() - options.pop("keys", None) # the keys are used only for client side caching - pool = self.connection_pool command_name = args[0] - conn = self.connection or await pool.get_connection(command_name, **options) + keys = options.pop("keys", None) # keys are used only for client side caching + response_from_cache = await self._get_from_local_cache(args) + if response_from_cache is not None: + return response_from_cache + else: + pool = self.connection_pool + conn = self.connection or await pool.get_connection(command_name, **options) - if self.single_connection_client: - await self._single_conn_lock.acquire() - try: - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, command_name, *args, **options - ), - lambda error: self._disconnect_raise(conn, error), - ) - finally: if self.single_connection_client: - self._single_conn_lock.release() - if not self.connection: - await pool.release(conn) + await self._single_conn_lock.acquire() + try: + if self.client_cache is not None and not self.client_cache_initialized: + await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, "CLIENT", *("CLIENT", "TRACKING", "ON") + ), + lambda error: self._disconnect_raise(conn, error), + ) + self.client_cache_initialized = True + response = await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, command_name, *args, **options + ), + lambda error: self._disconnect_raise(conn, error), + ) + self._add_to_local_cache(args, response, keys) + return response + finally: + if self.single_connection_client: + self._single_conn_lock.release() + if not self.connection: + await pool.release(conn) async def parse_response( self, connection: Connection, command_name: Union[str, bytes], **options @@ -866,7 +961,7 @@ async def connect(self): else: await self.connection.connect() if self.push_handler_func is not None and not HIREDIS_AVAILABLE: - self.connection._parser.set_push_handler(self.push_handler_func) + self.connection._parser.set_pubsub_push_handler(self.push_handler_func) async def _disconnect_raise_connect(self, conn, error): """ diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index bbd438fc0b..39f75a5f13 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -645,6 +645,10 @@ def pack_commands(self, commands: Iterable[Iterable[EncodableT]]) -> List[bytes] output.append(SYM_EMPTY.join(pieces)) return output + def _is_socket_empty(self): + """Check if the socket is empty""" + return not self._reader.at_eof() + class Connection(AbstractConnection): "Manages TCP communication to and from a Redis server" diff --git a/redis/cache.py b/redis/cache.py index 5a689d0ebd..d920702339 100644 --- a/redis/cache.py +++ b/redis/cache.py @@ -159,7 +159,7 @@ class EvictionPolicy(Enum): RANDOM = "random" -class _LocalChace: +class _LocalCache: """ A caching mechanism for storing redis commands and their responses. @@ -220,6 +220,7 @@ def get(self, command: str) -> ResponseT: if command in self.cache: if self._is_expired(command): self.delete(command) + return self._update_access(command) return self.cache[command]["response"] @@ -266,28 +267,28 @@ def _update_access(self, command: str): Args: command (str): The redis command. """ - if self.eviction_policy == EvictionPolicy.LRU: + if self.eviction_policy == EvictionPolicy.LRU.value: self.cache.move_to_end(command) - elif self.eviction_policy == EvictionPolicy.LFU: + elif self.eviction_policy == EvictionPolicy.LFU.value: self.cache[command]["access_count"] = ( self.cache.get(command, {}).get("access_count", 0) + 1 ) self.cache.move_to_end(command) - elif self.eviction_policy == EvictionPolicy.RANDOM: + elif self.eviction_policy == EvictionPolicy.RANDOM.value: pass # Random eviction doesn't require updates def _evict(self): """Evict a redis command from the cache based on the eviction policy.""" if self._is_expired(self.commands_ttl_list[0]): self.delete(self.commands_ttl_list[0]) - elif self.eviction_policy == EvictionPolicy.LRU: + elif self.eviction_policy == EvictionPolicy.LRU.value: self.cache.popitem(last=False) - elif self.eviction_policy == EvictionPolicy.LFU: + elif self.eviction_policy == EvictionPolicy.LFU.value: min_access_command = min( self.cache, key=lambda k: self.cache[k].get("access_count", 0) ) self.cache.pop(min_access_command) - elif self.eviction_policy == EvictionPolicy.RANDOM: + elif self.eviction_policy == EvictionPolicy.RANDOM.value: random_command = random.choice(list(self.cache.keys())) self.cache.pop(random_command) @@ -322,5 +323,6 @@ def invalidate(self, key: KeyT): """ if key not in self.key_commands_map: return - for command in self.key_commands_map[key]: + commands = list(self.key_commands_map[key]) + for command in commands: self.delete(command) diff --git a/redis/client.py b/redis/client.py index 0af7e050d6..7f2c8d290d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -4,7 +4,7 @@ import time import warnings from itertools import chain -from typing import Any, Callable, Dict, List, Optional, Type, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from redis._parsers.encoders import Encoder from redis._parsers.helpers import ( @@ -17,7 +17,7 @@ DEFAULT_BLACKLIST, DEFAULT_EVICTION_POLICY, DEFAULT_WHITELIST, - _LocalChace, + _LocalCache, ) from redis.commands import ( CoreCommands, @@ -211,7 +211,7 @@ def __init__( credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, cache_enable: bool = False, - client_cache: Optional[_LocalChace] = None, + client_cache: Optional[_LocalCache] = None, cache_max_size: int = 100, cache_ttl: int = 0, cache_eviction_policy: str = DEFAULT_EVICTION_POLICY, @@ -326,12 +326,16 @@ def __init__( self.client_cache = client_cache if cache_enable: - self.client_cache = _LocalChace( + self.client_cache = _LocalCache( cache_max_size, cache_ttl, cache_eviction_policy ) if self.client_cache is not None: self.cache_blacklist = cache_blacklist self.cache_whitelist = cache_whitelist + self.client_tracking_on() + self.connection._parser.set_invalidation_push_handler( + self._cache_invalidation_process + ) def __repr__(self) -> str: return ( @@ -358,6 +362,21 @@ def set_response_callback(self, command: str, callback: Callable) -> None: """Set a custom Response Callback""" self.response_callbacks[command] = callback + def _cache_invalidation_process( + self, data: List[Union[str, Optional[List[str]]]] + ) -> None: + """ + Invalidate (delete) all redis commands associated with a specific key. + `data` is a list of strings, where the first string is the invalidation message + and the second string is the list of keys to invalidate. + (if the list of keys is None, then all keys are invalidated) + """ + if data[1] is not None: + for key in data[1]: + self.client_cache.invalidate(str_if_bytes(key)) + else: + self.client_cache.flush() + def load_external_module(self, funcname, func) -> None: """ This function can be used to add externally defined redis modules, @@ -530,6 +549,8 @@ def close(self): if self.auto_close_connection_pool: self.connection_pool.disconnect() + if self.client_cache: + self.client_cache.flush() def _send_command_parse_response(self, conn, command_name, *args, **options): """ @@ -561,9 +582,13 @@ def _get_from_local_cache(self, command: str): or command[0] not in self.cache_whitelist ): return None + while not self.connection._is_socket_empty(): + self.connection.read_response(push_request=True) return self.client_cache.get(command) - def _add_to_local_cache(self, command: str, response: ResponseT, keys: List[KeysT]): + def _add_to_local_cache( + self, command: Tuple[str], response: ResponseT, keys: List[KeysT] + ): """ Add the command and response to the local cache if the command is allowed to be cached @@ -819,7 +844,7 @@ def execute_command(self, *args): # were listening to when we were disconnected self.connection.register_connect_callback(self.on_connect) if self.push_handler_func is not None and not HIREDIS_AVAILABLE: - self.connection._parser.set_push_handler(self.push_handler_func) + self.connection._parser.set_pubsub_push_handler(self.push_handler_func) connection = self.connection kwargs = {"check_health": not self.subscribed} if not self.subscribed: diff --git a/redis/cluster.py b/redis/cluster.py index 0405b0547c..8032173e66 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1778,7 +1778,7 @@ def execute_command(self, *args): # were listening to when we were disconnected self.connection.register_connect_callback(self.on_connect) if self.push_handler_func is not None and not HIREDIS_AVAILABLE: - self.connection._parser.set_push_handler(self.push_handler_func) + self.connection._parser.set_pubsub_push_handler(self.push_handler_func) connection = self.connection self._execute(connection, connection.send_command, *args) diff --git a/redis/connection.py b/redis/connection.py index c201224e35..35a4ff4a37 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -1,5 +1,6 @@ import copy import os +import select import socket import ssl import sys @@ -572,6 +573,11 @@ def pack_commands(self, commands): output.append(SYM_EMPTY.join(pieces)) return output + def _is_socket_empty(self): + """Check if the socket is empty""" + r, _, _ = select.select([self._sock], [], [], 0) + return not bool(r) + class Connection(AbstractConnection): "Manages TCP communication to and from a Redis server" diff --git a/tests/test_asyncio/test_cache.py b/tests/test_asyncio/test_cache.py new file mode 100644 index 0000000000..c837acfed1 --- /dev/null +++ b/tests/test_asyncio/test_cache.py @@ -0,0 +1,129 @@ +import time + +import pytest +import redis.asyncio as redis +from redis.utils import HIREDIS_AVAILABLE + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +async def test_get_from_cache(): + r = redis.Redis(cache_enable=True, single_connection_client=True, protocol=3) + r2 = redis.Redis(protocol=3) + # add key to redis + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == b"bar" + # get key from local cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + # change key in redis (cause invalidation) + await r2.set("foo", "barbar") + # send any command to redis (process invalidation in background) + await r.ping() + # the command is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + # get key from redis + assert await r.get("foo") == b"barbar" + + await r.aclose() + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +async def test_cache_max_size(): + r = redis.Redis( + cache_enable=True, cache_max_size=3, single_connection_client=True, protocol=3 + ) + # add 3 keys to redis + await r.set("foo", "bar") + await r.set("foo2", "bar2") + await r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert await r.get("foo") == b"bar" + assert await r.get("foo2") == b"bar2" + assert await r.get("foo3") == b"bar3" + # get the 3 keys from local cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo2")) == b"bar2" + assert r.client_cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + await r.set("foo4", "bar4") + assert await r.get("foo4") == b"bar4" + # the first key is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + + await r.aclose() + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +async def test_cache_ttl(): + r = redis.Redis( + cache_enable=True, cache_ttl=1, single_connection_client=True, protocol=3 + ) + # add key to redis + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == b"bar" + # get key from local cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + # wait for the key to expire + time.sleep(1) + # the key is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + + await r.aclose() + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +async def test_cache_lfu_eviction(): + r = redis.Redis( + cache_enable=True, + cache_max_size=3, + cache_eviction_policy="lfu", + single_connection_client=True, + protocol=3, + ) + # add 3 keys to redis + await r.set("foo", "bar") + await r.set("foo2", "bar2") + await r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert await r.get("foo") == b"bar" + assert await r.get("foo2") == b"bar2" + assert await r.get("foo3") == b"bar3" + # change the order of the keys in the cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + await r.set("foo4", "bar4") + assert await r.get("foo4") == b"bar4" + # test the eviction policy + assert len(r.client_cache.cache) == 3 + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo2")) is None + + await r.aclose() + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +async def test_cache_decode_response(): + r = redis.Redis( + decode_responses=True, + cache_enable=True, + single_connection_client=True, + protocol=3, + ) + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == "bar" + # get key from local cache + assert r.client_cache.get(("GET", "foo")) == "bar" + # change key in redis (cause invalidation) + await r.set("foo", "barbar") + # send any command to redis (process invalidation in background) + await r.ping() + # the command is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + # get key from redis + assert await r.get("foo") == "barbar" + + await r.aclose() diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000000..45621fe77e --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,119 @@ +import time + +import pytest +import redis +from redis.utils import HIREDIS_AVAILABLE + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +def test_get_from_cache(): + r = redis.Redis(cache_enable=True, single_connection_client=True, protocol=3) + r2 = redis.Redis(protocol=3) + # add key to redis + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == b"bar" + # get key from local cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + # change key in redis (cause invalidation) + r2.set("foo", "barbar") + # send any command to redis (process invalidation in background) + r.ping() + # the command is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + # get key from redis + assert r.get("foo") == b"barbar" + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +def test_cache_max_size(): + r = redis.Redis( + cache_enable=True, cache_max_size=3, single_connection_client=True, protocol=3 + ) + # add 3 keys to redis + r.set("foo", "bar") + r.set("foo2", "bar2") + r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert r.get("foo") == b"bar" + assert r.get("foo2") == b"bar2" + assert r.get("foo3") == b"bar3" + # get the 3 keys from local cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo2")) == b"bar2" + assert r.client_cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + r.set("foo4", "bar4") + assert r.get("foo4") == b"bar4" + # the first key is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +def test_cache_ttl(): + r = redis.Redis( + cache_enable=True, cache_ttl=1, single_connection_client=True, protocol=3 + ) + # add key to redis + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == b"bar" + # get key from local cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + # wait for the key to expire + time.sleep(1) + # the key is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +def test_cache_lfu_eviction(): + r = redis.Redis( + cache_enable=True, + cache_max_size=3, + cache_eviction_policy="lfu", + single_connection_client=True, + protocol=3, + ) + # add 3 keys to redis + r.set("foo", "bar") + r.set("foo2", "bar2") + r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert r.get("foo") == b"bar" + assert r.get("foo2") == b"bar2" + assert r.get("foo3") == b"bar3" + # change the order of the keys in the cache + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + r.set("foo4", "bar4") + assert r.get("foo4") == b"bar4" + # test the eviction policy + assert len(r.client_cache.cache) == 3 + assert r.client_cache.get(("GET", "foo")) == b"bar" + assert r.client_cache.get(("GET", "foo2")) is None + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +def test_cache_decode_response(): + r = redis.Redis( + decode_responses=True, + cache_enable=True, + single_connection_client=True, + protocol=3, + ) + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == "bar" + # get key from local cache + assert r.client_cache.get(("GET", "foo")) == "bar" + # change key in redis (cause invalidation) + r.set("foo", "barbar") + # send any command to redis (process invalidation in background) + r.ping() + # the command is not in the local cache anymore + assert r.client_cache.get(("GET", "foo")) is None + # get key from redis + assert r.get("foo") == "barbar" From 6046d5f8a2a50eba9b47fc9a6253c87a24fbc4df Mon Sep 17 00:00:00 2001 From: dvora-h <67596500+dvora-h@users.noreply.github.com> Date: Mon, 1 Jan 2024 13:22:15 +0200 Subject: [PATCH 002/196] fix acl_genpass with bits (#3062) --- redis/commands/core.py | 1 + tests/test_commands.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/redis/commands/core.py b/redis/commands/core.py index f97724d030..8fbd0d9104 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -97,6 +97,7 @@ def acl_genpass(self, bits: Union[int, None] = None, **kwargs) -> ResponseT: b = int(bits) if b < 0 or b > 4096: raise ValueError + pieces.append(b) except ValueError: raise DataError( "genpass optionally accepts a bits argument, between 0 and 4096." diff --git a/tests/test_commands.py b/tests/test_commands.py index 6660c2c6b0..b2d7c1b9ed 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -201,8 +201,9 @@ def test_acl_genpass(self, r): r.acl_genpass(-5) r.acl_genpass(5555) - r.acl_genpass(555) + password = r.acl_genpass(555) assert isinstance(password, (str, bytes)) + assert len(password) == 139 @skip_if_server_version_lt("7.0.0") @skip_if_redis_enterprise() From aea575597d317930c00350cdda18e6acb3814958 Mon Sep 17 00:00:00 2001 From: Aarni Koskela Date: Mon, 1 Jan 2024 15:10:22 +0200 Subject: [PATCH 003/196] Partial clean up of Python 3.7 compatibility (#2928) * Drop typing_extensions dependency (not necessary when targeting Python 3.8+) * Bump python_requires to >=3.8, drop importlib-metadata shim dependency * Cease testing on Python 3.7 * Add 3.8 test --------- Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> --- .github/workflows/integration.yaml | 6 +++--- redis/_parsers/hiredis.py | 4 +--- redis/asyncio/client.py | 3 ++- redis/asyncio/connection.py | 3 ++- redis/commands/cluster.py | 2 +- redis/commands/core.py | 2 +- redis/compat.py | 6 ------ redis/typing.py | 3 +-- setup.py | 5 +---- 9 files changed, 12 insertions(+), 22 deletions(-) delete mode 100644 redis/compat.py diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 96b51fbafb..207d58fac7 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -57,7 +57,7 @@ jobs: max-parallel: 15 fail-fast: false matrix: - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9'] + python-version: ['3.8', '3.9', '3.10', '3.11', 'pypy-3.8', 'pypy-3.9'] test-type: ['standalone', 'cluster'] connection-type: ['hiredis', 'plain'] env: @@ -111,7 +111,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.7', '3.11'] + python-version: ['3.8', '3.11'] test-type: ['standalone', 'cluster'] connection-type: ['hiredis', 'plain'] protocol: ['3'] @@ -160,7 +160,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9'] + python-version: ['3.8', '3.9', '3.10', '3.11', 'pypy-3.8', 'pypy-3.9'] steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 diff --git a/redis/_parsers/hiredis.py b/redis/_parsers/hiredis.py index 1919d3658e..a52dbbd013 100644 --- a/redis/_parsers/hiredis.py +++ b/redis/_parsers/hiredis.py @@ -1,15 +1,13 @@ import asyncio import socket import sys -from typing import Callable, List, Optional, Union +from typing import Callable, List, Optional, TypedDict, Union if sys.version_info.major >= 3 and sys.version_info.minor >= 11: from asyncio import timeout as async_timeout else: from async_timeout import timeout as async_timeout -from redis.compat import TypedDict - from ..exceptions import ConnectionError, InvalidResponse, RedisError from ..typing import EncodableT from ..utils import HIREDIS_AVAILABLE diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index eea9612f4a..79689fcb5e 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -15,9 +15,11 @@ Mapping, MutableMapping, Optional, + Protocol, Set, Tuple, Type, + TypedDict, TypeVar, Union, cast, @@ -55,7 +57,6 @@ AsyncSentinelCommands, list_or_args, ) -from redis.compat import Protocol, TypedDict from redis.credentials import CredentialProvider from redis.exceptions import ( ConnectionError, diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 39f75a5f13..df2bd20f9f 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -17,9 +17,11 @@ List, Mapping, Optional, + Protocol, Set, Tuple, Type, + TypedDict, TypeVar, Union, ) @@ -34,7 +36,6 @@ from redis.asyncio.retry import Retry from redis.backoff import NoBackoff -from redis.compat import Protocol, TypedDict from redis.connection import DEFAULT_RESP_VERSION from redis.credentials import CredentialProvider, UsernamePasswordCredentialProvider from redis.exceptions import ( diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index af3a717c27..8dd463ed18 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -7,13 +7,13 @@ Iterable, Iterator, List, + Literal, Mapping, NoReturn, Optional, Union, ) -from redis.compat import Literal from redis.crc import key_slot from redis.exceptions import RedisClusterException, RedisError from redis.typing import ( diff --git a/redis/commands/core.py b/redis/commands/core.py index 8fbd0d9104..5d7b774dc3 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -12,6 +12,7 @@ Iterable, Iterator, List, + Literal, Mapping, Optional, Sequence, @@ -20,7 +21,6 @@ Union, ) -from redis.compat import Literal from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError from redis.typing import ( AbsExpiryT, diff --git a/redis/compat.py b/redis/compat.py deleted file mode 100644 index e478493467..0000000000 --- a/redis/compat.py +++ /dev/null @@ -1,6 +0,0 @@ -# flake8: noqa -try: - from typing import Literal, Protocol, TypedDict # lgtm [py/unused-import] -except ImportError: - from typing_extensions import Literal # lgtm [py/unused-import] - from typing_extensions import Protocol, TypedDict diff --git a/redis/typing.py b/redis/typing.py index d1cd5568a3..a5d1369d63 100644 --- a/redis/typing.py +++ b/redis/typing.py @@ -7,13 +7,12 @@ Awaitable, Iterable, Mapping, + Protocol, Type, TypeVar, Union, ) -from redis.compat import Protocol - if TYPE_CHECKING: from redis._parsers import Encoder from redis.asyncio.connection import ConnectionPool as AsyncConnectionPool diff --git a/setup.py b/setup.py index 8979b29b23..89aa2e6658 100644 --- a/setup.py +++ b/setup.py @@ -34,10 +34,8 @@ }, author="Redis Inc.", author_email="oss@redis.com", - python_requires=">=3.7", + python_requires=">=3.8", install_requires=[ - 'importlib-metadata >= 1.0; python_version < "3.8"', - 'typing-extensions; python_version<"3.8"', 'async-timeout>=4.0.2; python_full_version<="3.11.2"', ], classifiers=[ @@ -49,7 +47,6 @@ "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", From c6bbfcad5a25d5497845010a52eef8c06224f79e Mon Sep 17 00:00:00 2001 From: dvora-h <67596500+dvora-h@users.noreply.github.com> Date: Mon, 1 Jan 2024 15:22:54 +0200 Subject: [PATCH 004/196] Version 5.1.0b1 (#3093) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 89aa2e6658..8329f00066 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ long_description_content_type="text/markdown", keywords=["Redis", "key-value store", "database"], license="MIT", - version="5.1.0a1", + version="5.1.0b1", packages=find_packages( include=[ "redis", From a63ba91d0df7ffed8e724a2d82cde4709a9196ec Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 2 Jan 2024 09:07:18 +0200 Subject: [PATCH 005/196] Bump github/codeql-action from 2 to 3 (#3096) Bumps [github/codeql-action](https://github.com/github/codeql-action) from 2 to 3. - [Release notes](https://github.com/github/codeql-action/releases) - [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/github/codeql-action/compare/v2...v3) --- updated-dependencies: - dependency-name: github/codeql-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/codeql-analysis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 61da2fce55..4670c55b0f 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -40,7 +40,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v2 + uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -51,7 +51,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v2 + uses: github/codeql-action/autobuild@v3 # ℹ️ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -65,4 +65,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + uses: github/codeql-action/analyze@v3 From 4f3b894d67296118fe8cc3e98c9e1f02e33f6488 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 2 Jan 2024 09:07:27 +0200 Subject: [PATCH 006/196] Bump actions/upload-artifact from 3 to 4 (#3097) Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 3 to 4. - [Release notes](https://github.com/actions/upload-artifact/releases) - [Commits](https://github.com/actions/upload-artifact/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/upload-artifact dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/docs.yaml | 2 +- .github/workflows/integration.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml index 56f16fa2b0..f32afb6a90 100644 --- a/.github/workflows/docs.yaml +++ b/.github/workflows/docs.yaml @@ -40,7 +40,7 @@ jobs: invoke build-docs - name: upload docs - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: redis-py-docs path: | diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 207d58fac7..23f6674aac 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -81,7 +81,7 @@ jobs: sleep 10 # time to settle invoke ${{matrix.test-type}}-tests - - uses: actions/upload-artifact@v3 + - uses: actions/upload-artifact@v4 if: success() || failure() with: name: pytest-results-${{matrix.test-type}}-${{matrix.connection-type}}-${{matrix.python-version}} From 6d77c6d715430c30f22147f8c572659d77380a9f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 2 Jan 2024 09:07:43 +0200 Subject: [PATCH 007/196] Bump actions/setup-python from 4 to 5 (#3095) Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4 to 5. - [Release notes](https://github.com/actions/setup-python/releases) - [Commits](https://github.com/actions/setup-python/compare/v4...v5) --- updated-dependencies: - dependency-name: actions/setup-python dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/docs.yaml | 2 +- .github/workflows/integration.yaml | 10 +++++----- .github/workflows/pypi-publish.yaml | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml index f32afb6a90..c5c74aa4d3 100644 --- a/.github/workflows/docs.yaml +++ b/.github/workflows/docs.yaml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: 3.9 cache: 'pip' diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 23f6674aac..f4ba256359 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -41,7 +41,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: 3.9 cache: 'pip' @@ -65,7 +65,7 @@ jobs: name: Python ${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}} tests steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -120,7 +120,7 @@ jobs: name: RESP3 [${{ matrix.python-version }} ${{matrix.test-type}}-${{matrix.connection-type}}] steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -147,7 +147,7 @@ jobs: extension: ['tar.gz', 'whl'] steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: 3.9 - name: Run installed unit tests @@ -163,7 +163,7 @@ jobs: python-version: ['3.8', '3.9', '3.10', '3.11', 'pypy-3.8', 'pypy-3.9'] steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: 'pip' diff --git a/.github/workflows/pypi-publish.yaml b/.github/workflows/pypi-publish.yaml index 4f8833372f..30720d7b8a 100644 --- a/.github/workflows/pypi-publish.yaml +++ b/.github/workflows/pypi-publish.yaml @@ -14,7 +14,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: install python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: 3.9 - name: Install dev tools From 8cbf7f5fa899675ef8fca09ec63f8588d351e297 Mon Sep 17 00:00:00 2001 From: dvora-h <67596500+dvora-h@users.noreply.github.com> Date: Sun, 7 Jan 2024 14:15:58 +0200 Subject: [PATCH 008/196] Support client side caching with ConnectionPool (#3099) * sync * async * fixs connection mocks * fix async connection mock * fix test_asyncio/test_connection.py::test_single_connection * add test for cache blacklist and flushdb at the end of each test * fix review comments --- redis/{cache.py => _cache.py} | 6 +- redis/asyncio/client.py | 106 +++++--------------------- redis/asyncio/connection.py | 88 ++++++++++++++++++++- redis/client.py | 98 +++++------------------- redis/connection.py | 91 +++++++++++++++++++++- tests/conftest.py | 1 + tests/test_asyncio/conftest.py | 1 + tests/test_asyncio/test_cache.py | 83 +++++++++++--------- tests/test_asyncio/test_connection.py | 3 +- tests/test_cache.py | 87 ++++++++++++--------- 10 files changed, 318 insertions(+), 246 deletions(-) rename redis/{cache.py => _cache.py} (98%) diff --git a/redis/cache.py b/redis/_cache.py similarity index 98% rename from redis/cache.py rename to redis/_cache.py index d920702339..4255afb7a4 100644 --- a/redis/cache.py +++ b/redis/_cache.py @@ -178,7 +178,11 @@ class _LocalCache: """ def __init__( - self, max_size: int, ttl: int, eviction_policy: EvictionPolicy, **kwargs + self, + max_size: int = 100, + ttl: int = 0, + eviction_policy: EvictionPolicy = DEFAULT_EVICTION_POLICY, + **kwargs, ): self.max_size = max_size self.ttl = ttl diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 79689fcb5e..143d997757 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -25,6 +25,12 @@ cast, ) +from redis._cache import ( + DEFAULT_BLACKLIST, + DEFAULT_EVICTION_POLICY, + DEFAULT_WHITELIST, + _LocalCache, +) from redis._parsers.helpers import ( _RedisCallbacks, _RedisCallbacksRESP2, @@ -39,12 +45,6 @@ ) from redis.asyncio.lock import Lock from redis.asyncio.retry import Retry -from redis.cache import ( - DEFAULT_BLACKLIST, - DEFAULT_EVICTION_POLICY, - DEFAULT_WHITELIST, - _LocalCache, -) from redis.client import ( EMPTY_RESPONSE, NEVER_DECODE, @@ -67,7 +67,7 @@ TimeoutError, WatchError, ) -from redis.typing import ChannelT, EncodableT, KeysT, KeyT, ResponseT +from redis.typing import ChannelT, EncodableT, KeyT from redis.utils import ( HIREDIS_AVAILABLE, _set_info_logger, @@ -294,6 +294,13 @@ def __init__( "lib_version": lib_version, "redis_connect_func": redis_connect_func, "protocol": protocol, + "cache_enable": cache_enable, + "client_cache": client_cache, + "cache_max_size": cache_max_size, + "cache_ttl": cache_ttl, + "cache_eviction_policy": cache_eviction_policy, + "cache_blacklist": cache_blacklist, + "cache_whitelist": cache_whitelist, } # based on input, setup appropriate connection args if unix_socket_path is not None: @@ -350,16 +357,6 @@ def __init__( # on a set of redis commands self._single_conn_lock = asyncio.Lock() - self.client_cache = client_cache - if cache_enable: - self.client_cache = _LocalCache( - cache_max_size, cache_ttl, cache_eviction_policy - ) - if self.client_cache is not None: - self.cache_blacklist = cache_blacklist - self.cache_whitelist = cache_whitelist - self.client_cache_initialized = False - def __repr__(self): return ( f"<{self.__class__.__module__}.{self.__class__.__name__}" @@ -374,10 +371,6 @@ async def initialize(self: _RedisT) -> _RedisT: async with self._single_conn_lock: if self.connection is None: self.connection = await self.connection_pool.get_connection("_") - if self.client_cache is not None: - self.connection._parser.set_invalidation_push_handler( - self._cache_invalidation_process - ) return self def set_response_callback(self, command: str, callback: ResponseCallbackT): @@ -596,8 +589,6 @@ async def aclose(self, close_connection_pool: Optional[bool] = None) -> None: close_connection_pool is None and self.auto_close_connection_pool ): await self.connection_pool.disconnect() - if self.client_cache: - self.client_cache.flush() @deprecated_function(version="5.0.1", reason="Use aclose() instead", name="close") async def close(self, close_connection_pool: Optional[bool] = None) -> None: @@ -626,89 +617,28 @@ async def _disconnect_raise(self, conn: Connection, error: Exception): ): raise error - def _cache_invalidation_process( - self, data: List[Union[str, Optional[List[str]]]] - ) -> None: - """ - Invalidate (delete) all redis commands associated with a specific key. - `data` is a list of strings, where the first string is the invalidation message - and the second string is the list of keys to invalidate. - (if the list of keys is None, then all keys are invalidated) - """ - if data[1] is not None: - for key in data[1]: - self.client_cache.invalidate(str_if_bytes(key)) - else: - self.client_cache.flush() - - async def _get_from_local_cache(self, command: str): - """ - If the command is in the local cache, return the response - """ - if ( - self.client_cache is None - or command[0] in self.cache_blacklist - or command[0] not in self.cache_whitelist - ): - return None - while not self.connection._is_socket_empty(): - await self.connection.read_response(push_request=True) - return self.client_cache.get(command) - - def _add_to_local_cache( - self, command: Tuple[str], response: ResponseT, keys: List[KeysT] - ): - """ - Add the command and response to the local cache if the command - is allowed to be cached - """ - if ( - self.client_cache is not None - and (self.cache_blacklist == [] or command[0] not in self.cache_blacklist) - and (self.cache_whitelist == [] or command[0] in self.cache_whitelist) - ): - self.client_cache.set(command, response, keys) - - def delete_from_local_cache(self, command: str): - """ - Delete the command from the local cache - """ - try: - self.client_cache.delete(command) - except AttributeError: - pass - # COMMAND EXECUTION AND PROTOCOL PARSING async def execute_command(self, *args, **options): """Execute a command and return a parsed response""" await self.initialize() command_name = args[0] keys = options.pop("keys", None) # keys are used only for client side caching - response_from_cache = await self._get_from_local_cache(args) + pool = self.connection_pool + conn = self.connection or await pool.get_connection(command_name, **options) + response_from_cache = await conn._get_from_local_cache(args) if response_from_cache is not None: return response_from_cache else: - pool = self.connection_pool - conn = self.connection or await pool.get_connection(command_name, **options) - if self.single_connection_client: await self._single_conn_lock.acquire() try: - if self.client_cache is not None and not self.client_cache_initialized: - await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, "CLIENT", *("CLIENT", "TRACKING", "ON") - ), - lambda error: self._disconnect_raise(conn, error), - ) - self.client_cache_initialized = True response = await conn.retry.call_with_retry( lambda: self._send_command_parse_response( conn, command_name, *args, **options ), lambda error: self._disconnect_raise(conn, error), ) - self._add_to_local_cache(args, response, keys) + conn._add_to_local_cache(args, response, keys) return response finally: if self.single_connection_client: diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index df2bd20f9f..7f1c0b71e4 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -47,9 +47,15 @@ ResponseError, TimeoutError, ) -from redis.typing import EncodableT +from redis.typing import EncodableT, KeysT, ResponseT from redis.utils import HIREDIS_AVAILABLE, get_lib_version, str_if_bytes +from .._cache import ( + DEFAULT_BLACKLIST, + DEFAULT_EVICTION_POLICY, + DEFAULT_WHITELIST, + _LocalCache, +) from .._parsers import ( BaseParser, Encoder, @@ -114,6 +120,9 @@ class AbstractConnection: "encoder", "ssl_context", "protocol", + "client_cache", + "cache_blacklist", + "cache_whitelist", "_reader", "_writer", "_parser", @@ -148,6 +157,13 @@ def __init__( encoder_class: Type[Encoder] = Encoder, credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, + cache_enable: bool = False, + client_cache: Optional[_LocalCache] = None, + cache_max_size: int = 100, + cache_ttl: int = 0, + cache_eviction_policy: str = DEFAULT_EVICTION_POLICY, + cache_blacklist: List[str] = DEFAULT_BLACKLIST, + cache_whitelist: List[str] = DEFAULT_WHITELIST, ): if (username or password) and credential_provider is not None: raise DataError( @@ -205,6 +221,14 @@ def __init__( if p < 2 or p > 3: raise ConnectionError("protocol must be either 2 or 3") self.protocol = protocol + if cache_enable: + _cache = _LocalCache(cache_max_size, cache_ttl, cache_eviction_policy) + else: + _cache = None + self.client_cache = client_cache if client_cache is not None else _cache + if self.client_cache is not None: + self.cache_blacklist = cache_blacklist + self.cache_whitelist = cache_whitelist def __del__(self, _warnings: Any = warnings): # For some reason, the individual streams don't get properly garbage @@ -395,6 +419,11 @@ async def on_connect(self) -> None: # if a database is specified, switch to it. Also pipeline this if self.db: await self.send_command("SELECT", self.db) + # if client caching is enabled, start tracking + if self.client_cache: + await self.send_command("CLIENT", "TRACKING", "ON") + await self.read_response() + self._parser.set_invalidation_push_handler(self._cache_invalidation_process) # read responses from pipeline for _ in (sent for sent in (self.lib_name, self.lib_version) if sent): @@ -429,6 +458,9 @@ async def disconnect(self, nowait: bool = False) -> None: raise TimeoutError( f"Timed out closing connection after {self.socket_connect_timeout}" ) from None + finally: + if self.client_cache: + self.client_cache.flush() async def _send_ping(self): """Send PING, expect PONG in return""" @@ -646,10 +678,62 @@ def pack_commands(self, commands: Iterable[Iterable[EncodableT]]) -> List[bytes] output.append(SYM_EMPTY.join(pieces)) return output - def _is_socket_empty(self): + def _socket_is_empty(self): """Check if the socket is empty""" return not self._reader.at_eof() + def _cache_invalidation_process( + self, data: List[Union[str, Optional[List[str]]]] + ) -> None: + """ + Invalidate (delete) all redis commands associated with a specific key. + `data` is a list of strings, where the first string is the invalidation message + and the second string is the list of keys to invalidate. + (if the list of keys is None, then all keys are invalidated) + """ + if data[1] is not None: + self.client_cache.flush() + else: + for key in data[1]: + self.client_cache.invalidate(str_if_bytes(key)) + + async def _get_from_local_cache(self, command: str): + """ + If the command is in the local cache, return the response + """ + if ( + self.client_cache is None + or command[0] in self.cache_blacklist + or command[0] not in self.cache_whitelist + ): + return None + while not self._socket_is_empty(): + await self.read_response(push_request=True) + return self.client_cache.get(command) + + def _add_to_local_cache( + self, command: Tuple[str], response: ResponseT, keys: List[KeysT] + ): + """ + Add the command and response to the local cache if the command + is allowed to be cached + """ + if ( + self.client_cache is not None + and (self.cache_blacklist == [] or command[0] not in self.cache_blacklist) + and (self.cache_whitelist == [] or command[0] in self.cache_whitelist) + ): + self.client_cache.set(command, response, keys) + + def delete_from_local_cache(self, command: str): + """ + Delete the command from the local cache + """ + try: + self.client_cache.delete(command) + except AttributeError: + pass + class Connection(AbstractConnection): "Manages TCP communication to and from a Redis server" diff --git a/redis/client.py b/redis/client.py index 7f2c8d290d..d685145339 100755 --- a/redis/client.py +++ b/redis/client.py @@ -4,8 +4,14 @@ import time import warnings from itertools import chain -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Dict, List, Optional, Type, Union +from redis._cache import ( + DEFAULT_BLACKLIST, + DEFAULT_EVICTION_POLICY, + DEFAULT_WHITELIST, + _LocalCache, +) from redis._parsers.encoders import Encoder from redis._parsers.helpers import ( _RedisCallbacks, @@ -13,12 +19,6 @@ _RedisCallbacksRESP3, bool_ok, ) -from redis.cache import ( - DEFAULT_BLACKLIST, - DEFAULT_EVICTION_POLICY, - DEFAULT_WHITELIST, - _LocalCache, -) from redis.commands import ( CoreCommands, RedisModuleCommands, @@ -38,7 +38,6 @@ ) from redis.lock import Lock from redis.retry import Retry -from redis.typing import KeysT, ResponseT from redis.utils import ( HIREDIS_AVAILABLE, _set_info_logger, @@ -268,6 +267,13 @@ def __init__( "redis_connect_func": redis_connect_func, "credential_provider": credential_provider, "protocol": protocol, + "cache_enable": cache_enable, + "client_cache": client_cache, + "cache_max_size": cache_max_size, + "cache_ttl": cache_ttl, + "cache_eviction_policy": cache_eviction_policy, + "cache_blacklist": cache_blacklist, + "cache_whitelist": cache_whitelist, } # based on input, setup appropriate connection args if unix_socket_path is not None: @@ -324,19 +330,6 @@ def __init__( else: self.response_callbacks.update(_RedisCallbacksRESP2) - self.client_cache = client_cache - if cache_enable: - self.client_cache = _LocalCache( - cache_max_size, cache_ttl, cache_eviction_policy - ) - if self.client_cache is not None: - self.cache_blacklist = cache_blacklist - self.cache_whitelist = cache_whitelist - self.client_tracking_on() - self.connection._parser.set_invalidation_push_handler( - self._cache_invalidation_process - ) - def __repr__(self) -> str: return ( f"<{type(self).__module__}.{type(self).__name__}" @@ -362,21 +355,6 @@ def set_response_callback(self, command: str, callback: Callable) -> None: """Set a custom Response Callback""" self.response_callbacks[command] = callback - def _cache_invalidation_process( - self, data: List[Union[str, Optional[List[str]]]] - ) -> None: - """ - Invalidate (delete) all redis commands associated with a specific key. - `data` is a list of strings, where the first string is the invalidation message - and the second string is the list of keys to invalidate. - (if the list of keys is None, then all keys are invalidated) - """ - if data[1] is not None: - for key in data[1]: - self.client_cache.invalidate(str_if_bytes(key)) - else: - self.client_cache.flush() - def load_external_module(self, funcname, func) -> None: """ This function can be used to add externally defined redis modules, @@ -549,8 +527,6 @@ def close(self): if self.auto_close_connection_pool: self.connection_pool.disconnect() - if self.client_cache: - self.client_cache.flush() def _send_command_parse_response(self, conn, command_name, *args, **options): """ @@ -572,55 +548,17 @@ def _disconnect_raise(self, conn, error): ): raise error - def _get_from_local_cache(self, command: str): - """ - If the command is in the local cache, return the response - """ - if ( - self.client_cache is None - or command[0] in self.cache_blacklist - or command[0] not in self.cache_whitelist - ): - return None - while not self.connection._is_socket_empty(): - self.connection.read_response(push_request=True) - return self.client_cache.get(command) - - def _add_to_local_cache( - self, command: Tuple[str], response: ResponseT, keys: List[KeysT] - ): - """ - Add the command and response to the local cache if the command - is allowed to be cached - """ - if ( - self.client_cache is not None - and (self.cache_blacklist == [] or command[0] not in self.cache_blacklist) - and (self.cache_whitelist == [] or command[0] in self.cache_whitelist) - ): - self.client_cache.set(command, response, keys) - - def delete_from_local_cache(self, command: str): - """ - Delete the command from the local cache - """ - try: - self.client_cache.delete(command) - except AttributeError: - pass - # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): """Execute a command and return a parsed response""" command_name = args[0] keys = options.pop("keys", None) - response_from_cache = self._get_from_local_cache(args) + pool = self.connection_pool + conn = self.connection or pool.get_connection(command_name, **options) + response_from_cache = conn._get_from_local_cache(args) if response_from_cache is not None: return response_from_cache else: - pool = self.connection_pool - conn = self.connection or pool.get_connection(command_name, **options) - try: response = conn.retry.call_with_retry( lambda: self._send_command_parse_response( @@ -628,7 +566,7 @@ def execute_command(self, *args, **options): ), lambda error: self._disconnect_raise(conn, error), ) - self._add_to_local_cache(args, response, keys) + conn._add_to_local_cache(args, response, keys) return response finally: if not self.connection: diff --git a/redis/connection.py b/redis/connection.py index 35a4ff4a37..a09fb3949c 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -10,9 +10,15 @@ from itertools import chain from queue import Empty, Full, LifoQueue from time import time -from typing import Any, Callable, List, Optional, Type, Union +from typing import Any, Callable, List, Optional, Tuple, Type, Union from urllib.parse import parse_qs, unquote, urlparse +from ._cache import ( + DEFAULT_BLACKLIST, + DEFAULT_EVICTION_POLICY, + DEFAULT_WHITELIST, + _LocalCache, +) from ._parsers import Encoder, _HiredisParser, _RESP2Parser, _RESP3Parser from .backoff import NoBackoff from .credentials import CredentialProvider, UsernamePasswordCredentialProvider @@ -27,6 +33,7 @@ TimeoutError, ) from .retry import Retry +from .typing import KeysT, ResponseT from .utils import ( CRYPTOGRAPHY_AVAILABLE, HIREDIS_AVAILABLE, @@ -150,6 +157,13 @@ def __init__( credential_provider: Optional[CredentialProvider] = None, protocol: Optional[int] = 2, command_packer: Optional[Callable[[], None]] = None, + cache_enable: bool = False, + client_cache: Optional[_LocalCache] = None, + cache_max_size: int = 100, + cache_ttl: int = 0, + cache_eviction_policy: str = DEFAULT_EVICTION_POLICY, + cache_blacklist: List[str] = DEFAULT_BLACKLIST, + cache_whitelist: List[str] = DEFAULT_WHITELIST, ): """ Initialize a new Connection. @@ -215,6 +229,18 @@ def __init__( # p = DEFAULT_RESP_VERSION self.protocol = p self._command_packer = self._construct_command_packer(command_packer) + if cache_enable: + _cache = _LocalCache(cache_max_size, cache_ttl, cache_eviction_policy) + else: + _cache = None + self.client_cache = client_cache if client_cache is not None else _cache + if self.client_cache is not None: + if self.protocol not in [3, "3"]: + raise RedisError( + "client caching is only supported with protocol version 3 or higher" + ) + self.cache_blacklist = cache_blacklist + self.cache_whitelist = cache_whitelist def __repr__(self): repr_args = ",".join([f"{k}={v}" for k, v in self.repr_pieces()]) @@ -406,6 +432,12 @@ def on_connect(self): if str_if_bytes(self.read_response()) != "OK": raise ConnectionError("Invalid Database") + # if client caching is enabled, start tracking + if self.client_cache: + self.send_command("CLIENT", "TRACKING", "ON") + self.read_response() + self._parser.set_invalidation_push_handler(self._cache_invalidation_process) + def disconnect(self, *args): "Disconnects from the Redis server" self._parser.on_disconnect() @@ -426,6 +458,9 @@ def disconnect(self, *args): except OSError: pass + if self.client_cache: + self.client_cache.flush() + def _send_ping(self): """Send PING, expect PONG in return""" self.send_command("PING", check_health=False) @@ -573,11 +608,63 @@ def pack_commands(self, commands): output.append(SYM_EMPTY.join(pieces)) return output - def _is_socket_empty(self): + def _socket_is_empty(self): """Check if the socket is empty""" r, _, _ = select.select([self._sock], [], [], 0) return not bool(r) + def _cache_invalidation_process( + self, data: List[Union[str, Optional[List[str]]]] + ) -> None: + """ + Invalidate (delete) all redis commands associated with a specific key. + `data` is a list of strings, where the first string is the invalidation message + and the second string is the list of keys to invalidate. + (if the list of keys is None, then all keys are invalidated) + """ + if data[1] is None: + self.client_cache.flush() + else: + for key in data[1]: + self.client_cache.invalidate(str_if_bytes(key)) + + def _get_from_local_cache(self, command: str): + """ + If the command is in the local cache, return the response + """ + if ( + self.client_cache is None + or command[0] in self.cache_blacklist + or command[0] not in self.cache_whitelist + ): + return None + while not self._socket_is_empty(): + self.read_response(push_request=True) + return self.client_cache.get(command) + + def _add_to_local_cache( + self, command: Tuple[str], response: ResponseT, keys: List[KeysT] + ): + """ + Add the command and response to the local cache if the command + is allowed to be cached + """ + if ( + self.client_cache is not None + and (self.cache_blacklist == [] or command[0] not in self.cache_blacklist) + and (self.cache_whitelist == [] or command[0] in self.cache_whitelist) + ): + self.client_cache.set(command, response, keys) + + def delete_from_local_cache(self, command: str): + """ + Delete the command from the local cache + """ + try: + self.client_cache.delete(command) + except AttributeError: + pass + class Connection(AbstractConnection): "Manages TCP communication to and from a Redis server" diff --git a/tests/conftest.py b/tests/conftest.py index bad9f43e42..e56b5f6aed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -356,6 +356,7 @@ def _gen_cluster_mock_resp(r, response): connection = Mock(spec=Connection) connection.retry = Retry(NoBackoff(), 0) connection.read_response.return_value = response + connection._get_from_local_cache.return_value = None with mock.patch.object(r, "connection", connection): yield r diff --git a/tests/test_asyncio/conftest.py b/tests/test_asyncio/conftest.py index 5d9e0b4f2e..c79b706abc 100644 --- a/tests/test_asyncio/conftest.py +++ b/tests/test_asyncio/conftest.py @@ -141,6 +141,7 @@ def _gen_cluster_mock_resp(r, response): connection = mock.AsyncMock(spec=Connection) connection.retry = Retry(NoBackoff(), 0) connection.read_response.return_value = response + connection._get_from_local_cache.return_value = None with mock.patch.object(r, "connection", connection): yield r diff --git a/tests/test_asyncio/test_cache.py b/tests/test_asyncio/test_cache.py index c837acfed1..92328b8391 100644 --- a/tests/test_asyncio/test_cache.py +++ b/tests/test_asyncio/test_cache.py @@ -2,36 +2,38 @@ import pytest import redis.asyncio as redis +from redis._cache import _LocalCache from redis.utils import HIREDIS_AVAILABLE @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") async def test_get_from_cache(): - r = redis.Redis(cache_enable=True, single_connection_client=True, protocol=3) + cache = _LocalCache() + r = redis.Redis(protocol=3, client_cache=cache) r2 = redis.Redis(protocol=3) # add key to redis await r.set("foo", "bar") # get key from redis and save in local cache assert await r.get("foo") == b"bar" # get key from local cache - assert r.client_cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" # change key in redis (cause invalidation) await r2.set("foo", "barbar") # send any command to redis (process invalidation in background) await r.ping() # the command is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None # get key from redis assert await r.get("foo") == b"barbar" + await r.flushdb() await r.aclose() @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") async def test_cache_max_size(): - r = redis.Redis( - cache_enable=True, cache_max_size=3, single_connection_client=True, protocol=3 - ) + cache = _LocalCache(max_size=3) + r = redis.Redis(client_cache=cache, protocol=3) # add 3 keys to redis await r.set("foo", "bar") await r.set("foo2", "bar2") @@ -41,46 +43,42 @@ async def test_cache_max_size(): assert await r.get("foo2") == b"bar2" assert await r.get("foo3") == b"bar3" # get the 3 keys from local cache - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo2")) == b"bar2" - assert r.client_cache.get(("GET", "foo3")) == b"bar3" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) == b"bar2" + assert cache.get(("GET", "foo3")) == b"bar3" # add 1 more key to redis (exceed the max size) await r.set("foo4", "bar4") assert await r.get("foo4") == b"bar4" # the first key is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None + await r.flushdb() await r.aclose() @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") async def test_cache_ttl(): - r = redis.Redis( - cache_enable=True, cache_ttl=1, single_connection_client=True, protocol=3 - ) + cache = _LocalCache(ttl=1) + r = redis.Redis(client_cache=cache, protocol=3) # add key to redis await r.set("foo", "bar") # get key from redis and save in local cache assert await r.get("foo") == b"bar" # get key from local cache - assert r.client_cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" # wait for the key to expire time.sleep(1) # the key is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None + await r.flushdb() await r.aclose() @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") async def test_cache_lfu_eviction(): - r = redis.Redis( - cache_enable=True, - cache_max_size=3, - cache_eviction_policy="lfu", - single_connection_client=True, - protocol=3, - ) + cache = _LocalCache(max_size=3, eviction_policy="lfu") + r = redis.Redis(client_cache=cache, protocol=3) # add 3 keys to redis await r.set("foo", "bar") await r.set("foo2", "bar2") @@ -90,40 +88,53 @@ async def test_cache_lfu_eviction(): assert await r.get("foo2") == b"bar2" assert await r.get("foo3") == b"bar3" # change the order of the keys in the cache - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo3")) == b"bar3" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo3")) == b"bar3" # add 1 more key to redis (exceed the max size) await r.set("foo4", "bar4") assert await r.get("foo4") == b"bar4" # test the eviction policy - assert len(r.client_cache.cache) == 3 - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo2")) is None + assert len(cache.cache) == 3 + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) is None + await r.flushdb() await r.aclose() @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") async def test_cache_decode_response(): - r = redis.Redis( - decode_responses=True, - cache_enable=True, - single_connection_client=True, - protocol=3, - ) + cache = _LocalCache() + r = redis.Redis(decode_responses=True, client_cache=cache, protocol=3) await r.set("foo", "bar") # get key from redis and save in local cache assert await r.get("foo") == "bar" # get key from local cache - assert r.client_cache.get(("GET", "foo")) == "bar" + assert cache.get(("GET", "foo")) == "bar" # change key in redis (cause invalidation) await r.set("foo", "barbar") # send any command to redis (process invalidation in background) await r.ping() # the command is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None # get key from redis assert await r.get("foo") == "barbar" + await r.flushdb() + await r.aclose() + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +async def test_cache_blacklist(): + cache = _LocalCache() + r = redis.Redis(client_cache=cache, cache_blacklist=["LLEN"], protocol=3) + # add list to redis + await r.lpush("mylist", "foo", "bar", "baz") + assert await r.llen("mylist") == 3 + assert await r.lindex("mylist", 1) == b"bar" + assert cache.get(("LLEN", "mylist")) is None + assert cache.get(("LINDEX", "mylist", 1)) == b"bar" + + await r.flushdb() await r.aclose() diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 55a1c3a2f6..4ff3808602 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -68,8 +68,9 @@ async def call_with_retry(self, _, __): in_use = False return "foo" - mock_conn = mock.MagicMock() + mock_conn = mock.AsyncMock(spec=Connection) mock_conn.retry = Retry_() + mock_conn._get_from_local_cache.return_value = None async def get_conn(_): # Validate only one client is created in single-client mode when diff --git a/tests/test_cache.py b/tests/test_cache.py index 45621fe77e..85df8b1a22 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -2,34 +2,37 @@ import pytest import redis +from redis._cache import _LocalCache from redis.utils import HIREDIS_AVAILABLE @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") def test_get_from_cache(): - r = redis.Redis(cache_enable=True, single_connection_client=True, protocol=3) + cache = _LocalCache() + r = redis.Redis(protocol=3, client_cache=cache) r2 = redis.Redis(protocol=3) # add key to redis r.set("foo", "bar") # get key from redis and save in local cache assert r.get("foo") == b"bar" # get key from local cache - assert r.client_cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" # change key in redis (cause invalidation) r2.set("foo", "barbar") # send any command to redis (process invalidation in background) r.ping() # the command is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None # get key from redis assert r.get("foo") == b"barbar" + r.flushdb() + @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") def test_cache_max_size(): - r = redis.Redis( - cache_enable=True, cache_max_size=3, single_connection_client=True, protocol=3 - ) + cache = _LocalCache(max_size=3) + r = redis.Redis(client_cache=cache, protocol=3) # add 3 keys to redis r.set("foo", "bar") r.set("foo2", "bar2") @@ -39,42 +42,40 @@ def test_cache_max_size(): assert r.get("foo2") == b"bar2" assert r.get("foo3") == b"bar3" # get the 3 keys from local cache - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo2")) == b"bar2" - assert r.client_cache.get(("GET", "foo3")) == b"bar3" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) == b"bar2" + assert cache.get(("GET", "foo3")) == b"bar3" # add 1 more key to redis (exceed the max size) r.set("foo4", "bar4") assert r.get("foo4") == b"bar4" # the first key is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None + + r.flushdb() @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") def test_cache_ttl(): - r = redis.Redis( - cache_enable=True, cache_ttl=1, single_connection_client=True, protocol=3 - ) + cache = _LocalCache(ttl=1) + r = redis.Redis(client_cache=cache, protocol=3) # add key to redis r.set("foo", "bar") # get key from redis and save in local cache assert r.get("foo") == b"bar" # get key from local cache - assert r.client_cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" # wait for the key to expire time.sleep(1) # the key is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None + + r.flushdb() @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") def test_cache_lfu_eviction(): - r = redis.Redis( - cache_enable=True, - cache_max_size=3, - cache_eviction_policy="lfu", - single_connection_client=True, - protocol=3, - ) + cache = _LocalCache(max_size=3, eviction_policy="lfu") + r = redis.Redis(client_cache=cache, protocol=3) # add 3 keys to redis r.set("foo", "bar") r.set("foo2", "bar2") @@ -84,36 +85,50 @@ def test_cache_lfu_eviction(): assert r.get("foo2") == b"bar2" assert r.get("foo3") == b"bar3" # change the order of the keys in the cache - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo3")) == b"bar3" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo3")) == b"bar3" # add 1 more key to redis (exceed the max size) r.set("foo4", "bar4") assert r.get("foo4") == b"bar4" # test the eviction policy - assert len(r.client_cache.cache) == 3 - assert r.client_cache.get(("GET", "foo")) == b"bar" - assert r.client_cache.get(("GET", "foo2")) is None + assert len(cache.cache) == 3 + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) is None + + r.flushdb() @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") def test_cache_decode_response(): - r = redis.Redis( - decode_responses=True, - cache_enable=True, - single_connection_client=True, - protocol=3, - ) + cache = _LocalCache() + r = redis.Redis(decode_responses=True, client_cache=cache, protocol=3) r.set("foo", "bar") # get key from redis and save in local cache assert r.get("foo") == "bar" # get key from local cache - assert r.client_cache.get(("GET", "foo")) == "bar" + assert cache.get(("GET", "foo")) == "bar" # change key in redis (cause invalidation) r.set("foo", "barbar") # send any command to redis (process invalidation in background) r.ping() # the command is not in the local cache anymore - assert r.client_cache.get(("GET", "foo")) is None + assert cache.get(("GET", "foo")) is None # get key from redis assert r.get("foo") == "barbar" + + r.flushdb() + + +@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") +def test_cache_blacklist(): + cache = _LocalCache() + r = redis.Redis(client_cache=cache, cache_blacklist=["LLEN"], protocol=3) + # add list to redis + r.lpush("mylist", "foo", "bar", "baz") + assert r.llen("mylist") == 3 + assert r.lindex("mylist", 1) == b"bar" + assert cache.get(("LLEN", "mylist")) is None + assert cache.get(("LINDEX", "mylist", 1)) == b"bar" + + r.flushdb() From b5d4d29409f83e9a0056c6ed12414bf58e1ea1c0 Mon Sep 17 00:00:00 2001 From: dvora-h <67596500+dvora-h@users.noreply.github.com> Date: Sun, 7 Jan 2024 14:46:23 +0200 Subject: [PATCH 009/196] Version 5.1.0b2 (#3100) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8329f00066..795aecd142 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ long_description_content_type="text/markdown", keywords=["Redis", "key-value store", "database"], license="MIT", - version="5.1.0b1", + version="5.1.0b2", packages=find_packages( include=[ "redis", From c7a13ae3f2529ee3f1bd0b5f991de4d06c33a6db Mon Sep 17 00:00:00 2001 From: dvora-h <67596500+dvora-h@users.noreply.github.com> Date: Tue, 9 Jan 2024 12:47:00 +0200 Subject: [PATCH 010/196] Support client side caching with RedisCluster (#3102) * sync * fix mock_node_resp * fix mock_node_resp_func * fix test_handling_cluster_failover_to_a_replica * fix test_handling_cluster_failover_to_a_replica * async cluster and cleanup tests * delete comment --- redis/asyncio/cluster.py | 46 ++++- redis/asyncio/connection.py | 4 + redis/cluster.py | 29 ++- tests/conftest.py | 2 +- tests/test_asyncio/conftest.py | 3 +- tests/test_asyncio/test_cache.py | 291 ++++++++++++++++------------- tests/test_asyncio/test_cluster.py | 2 + tests/test_cache.py | 286 ++++++++++++++++------------ tests/test_cluster.py | 3 + 9 files changed, 398 insertions(+), 268 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 6a1753ad19..486053e1cc 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -18,6 +18,12 @@ Union, ) +from redis._cache import ( + DEFAULT_BLACKLIST, + DEFAULT_EVICTION_POLICY, + DEFAULT_WHITELIST, + _LocalCache, +) from redis._parsers import AsyncCommandsParser, Encoder from redis._parsers.helpers import ( _RedisCallbacks, @@ -267,6 +273,13 @@ def __init__( ssl_keyfile: Optional[str] = None, protocol: Optional[int] = 2, address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None, + cache_enable: bool = False, + client_cache: Optional[_LocalCache] = None, + cache_max_size: int = 100, + cache_ttl: int = 0, + cache_eviction_policy: str = DEFAULT_EVICTION_POLICY, + cache_blacklist: List[str] = DEFAULT_BLACKLIST, + cache_whitelist: List[str] = DEFAULT_WHITELIST, ) -> None: if db: raise RedisClusterException( @@ -310,6 +323,14 @@ def __init__( "socket_timeout": socket_timeout, "retry": retry, "protocol": protocol, + # Client cache related kwargs + "cache_enable": cache_enable, + "client_cache": client_cache, + "cache_max_size": cache_max_size, + "cache_ttl": cache_ttl, + "cache_eviction_policy": cache_eviction_policy, + "cache_blacklist": cache_blacklist, + "cache_whitelist": cache_whitelist, } if ssl: @@ -682,7 +703,6 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: :raises RedisClusterException: if target_nodes is not provided & the command can't be mapped to a slot """ - kwargs.pop("keys", None) # the keys are used only for client side caching command = args[0] target_nodes = [] target_nodes_specified = False @@ -1039,16 +1059,24 @@ async def parse_response( async def execute_command(self, *args: Any, **kwargs: Any) -> Any: # Acquire connection connection = self.acquire_connection() + keys = kwargs.pop("keys", None) - # Execute command - await connection.send_packed_command(connection.pack_command(*args), False) - - # Read response - try: - return await self.parse_response(connection, args[0], **kwargs) - finally: - # Release connection + response_from_cache = await connection._get_from_local_cache(args) + if response_from_cache is not None: self._free.append(connection) + return response_from_cache + else: + # Execute command + await connection.send_packed_command(connection.pack_command(*args), False) + + # Read response + try: + response = await self.parse_response(connection, args[0], **kwargs) + connection._add_to_local_cache(args, response, keys) + return response + finally: + # Release connection + self._free.append(connection) async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: # Acquire connection diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 7f1c0b71e4..05a27879a6 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -227,6 +227,10 @@ def __init__( _cache = None self.client_cache = client_cache if client_cache is not None else _cache if self.client_cache is not None: + if self.protocol not in [3, "3"]: + raise RedisError( + "client caching is only supported with protocol version 3 or higher" + ) self.cache_blacklist = cache_blacklist self.cache_whitelist = cache_whitelist diff --git a/redis/cluster.py b/redis/cluster.py index 8032173e66..7bdf4c1951 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -167,6 +167,13 @@ def parse_cluster_myshardid(resp, **options): "ssl_password", "unix_socket_path", "username", + "cache_enable", + "client_cache", + "cache_max_size", + "cache_ttl", + "cache_eviction_policy", + "cache_blacklist", + "cache_whitelist", ) KWARGS_DISABLED_KEYS = ("host", "port") @@ -1060,7 +1067,6 @@ def execute_command(self, *args, **kwargs): list dict """ - kwargs.pop("keys", None) # the keys are used only for client side caching target_nodes_specified = False is_default_node = False target_nodes = None @@ -1119,6 +1125,7 @@ def _execute_command(self, target_node, *args, **kwargs): """ Send a command to a node in the cluster """ + keys = kwargs.pop("keys", None) command = args[0] redis_node = None connection = None @@ -1147,14 +1154,18 @@ def _execute_command(self, target_node, *args, **kwargs): connection.send_command("ASKING") redis_node.parse_response(connection, "ASKING", **kwargs) asking = False - - connection.send_command(*args) - response = redis_node.parse_response(connection, command, **kwargs) - if command in self.cluster_response_callbacks: - response = self.cluster_response_callbacks[command]( - response, **kwargs - ) - return response + response_from_cache = connection._get_from_local_cache(args) + if response_from_cache is not None: + return response_from_cache + else: + connection.send_command(*args) + response = redis_node.parse_response(connection, command, **kwargs) + if command in self.cluster_response_callbacks: + response = self.cluster_response_callbacks[command]( + response, **kwargs + ) + connection._add_to_local_cache(args, response, keys) + return response except AuthenticationError: raise except (ConnectionError, TimeoutError) as e: diff --git a/tests/conftest.py b/tests/conftest.py index e56b5f6aed..8786e2b9f0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -275,7 +275,7 @@ def _get_client( redis_url = request.config.getoption("--redis-url") else: redis_url = from_url - if "protocol" not in redis_url: + if "protocol" not in redis_url and kwargs.get("protocol") is None: kwargs["protocol"] = request.config.getoption("--protocol") cluster_mode = REDIS_INFO["cluster_enabled"] diff --git a/tests/test_asyncio/conftest.py b/tests/test_asyncio/conftest.py index c79b706abc..c6afec5af6 100644 --- a/tests/test_asyncio/conftest.py +++ b/tests/test_asyncio/conftest.py @@ -69,10 +69,9 @@ async def client_factory( url: str = request.config.getoption("--redis-url"), cls=redis.Redis, flushdb=True, - protocol=request.config.getoption("--protocol"), **kwargs, ): - if "protocol" not in url: + if "protocol" not in url and kwargs.get("protocol") is None: kwargs["protocol"] = request.config.getoption("--protocol") cluster_mode = REDIS_INFO["cluster_enabled"] diff --git a/tests/test_asyncio/test_cache.py b/tests/test_asyncio/test_cache.py index 92328b8391..098ede8d75 100644 --- a/tests/test_asyncio/test_cache.py +++ b/tests/test_asyncio/test_cache.py @@ -1,140 +1,177 @@ import time import pytest -import redis.asyncio as redis +import pytest_asyncio from redis._cache import _LocalCache from redis.utils import HIREDIS_AVAILABLE -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -async def test_get_from_cache(): - cache = _LocalCache() - r = redis.Redis(protocol=3, client_cache=cache) - r2 = redis.Redis(protocol=3) - # add key to redis - await r.set("foo", "bar") - # get key from redis and save in local cache - assert await r.get("foo") == b"bar" - # get key from local cache - assert cache.get(("GET", "foo")) == b"bar" - # change key in redis (cause invalidation) - await r2.set("foo", "barbar") - # send any command to redis (process invalidation in background) - await r.ping() - # the command is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - # get key from redis - assert await r.get("foo") == b"barbar" - - await r.flushdb() - await r.aclose() - - -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -async def test_cache_max_size(): - cache = _LocalCache(max_size=3) - r = redis.Redis(client_cache=cache, protocol=3) - # add 3 keys to redis - await r.set("foo", "bar") - await r.set("foo2", "bar2") - await r.set("foo3", "bar3") - # get 3 keys from redis and save in local cache - assert await r.get("foo") == b"bar" - assert await r.get("foo2") == b"bar2" - assert await r.get("foo3") == b"bar3" - # get the 3 keys from local cache - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo2")) == b"bar2" - assert cache.get(("GET", "foo3")) == b"bar3" - # add 1 more key to redis (exceed the max size) - await r.set("foo4", "bar4") - assert await r.get("foo4") == b"bar4" - # the first key is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - - await r.flushdb() - await r.aclose() - - -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -async def test_cache_ttl(): - cache = _LocalCache(ttl=1) - r = redis.Redis(client_cache=cache, protocol=3) - # add key to redis - await r.set("foo", "bar") - # get key from redis and save in local cache - assert await r.get("foo") == b"bar" - # get key from local cache - assert cache.get(("GET", "foo")) == b"bar" - # wait for the key to expire - time.sleep(1) - # the key is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - - await r.flushdb() - await r.aclose() - - -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -async def test_cache_lfu_eviction(): - cache = _LocalCache(max_size=3, eviction_policy="lfu") - r = redis.Redis(client_cache=cache, protocol=3) - # add 3 keys to redis - await r.set("foo", "bar") - await r.set("foo2", "bar2") - await r.set("foo3", "bar3") - # get 3 keys from redis and save in local cache - assert await r.get("foo") == b"bar" - assert await r.get("foo2") == b"bar2" - assert await r.get("foo3") == b"bar3" - # change the order of the keys in the cache - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo3")) == b"bar3" - # add 1 more key to redis (exceed the max size) - await r.set("foo4", "bar4") - assert await r.get("foo4") == b"bar4" - # test the eviction policy - assert len(cache.cache) == 3 - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo2")) is None - - await r.flushdb() - await r.aclose() +@pytest_asyncio.fixture +async def r(request, create_redis): + cache = request.param.get("cache") + kwargs = request.param.get("kwargs", {}) + r = await create_redis(protocol=3, client_cache=cache, **kwargs) + yield r, cache @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -async def test_cache_decode_response(): - cache = _LocalCache() - r = redis.Redis(decode_responses=True, client_cache=cache, protocol=3) - await r.set("foo", "bar") - # get key from redis and save in local cache - assert await r.get("foo") == "bar" - # get key from local cache - assert cache.get(("GET", "foo")) == "bar" - # change key in redis (cause invalidation) - await r.set("foo", "barbar") - # send any command to redis (process invalidation in background) - await r.ping() - # the command is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - # get key from redis - assert await r.get("foo") == "barbar" - - await r.flushdb() - await r.aclose() +class TestLocalCache: + @pytest.mark.onlynoncluster + @pytest.mark.parametrize("r", [{"cache": _LocalCache()}], indirect=True) + async def test_get_from_cache(self, r, r2): + r, cache = r + # add key to redis + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == b"bar" + # get key from local cache + assert cache.get(("GET", "foo")) == b"bar" + # change key in redis (cause invalidation) + await r2.set("foo", "barbar") + # send any command to redis (process invalidation in background) + await r.ping() + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert await r.get("foo") == b"barbar" + + @pytest.mark.parametrize("r", [{"cache": _LocalCache(max_size=3)}], indirect=True) + async def test_cache_max_size(self, r): + r, cache = r + # add 3 keys to redis + await r.set("foo", "bar") + await r.set("foo2", "bar2") + await r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert await r.get("foo") == b"bar" + assert await r.get("foo2") == b"bar2" + assert await r.get("foo3") == b"bar3" + # get the 3 keys from local cache + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) == b"bar2" + assert cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + await r.set("foo4", "bar4") + assert await r.get("foo4") == b"bar4" + # the first key is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + + @pytest.mark.parametrize("r", [{"cache": _LocalCache(ttl=1)}], indirect=True) + async def test_cache_ttl(self, r): + r, cache = r + # add key to redis + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == b"bar" + # get key from local cache + assert cache.get(("GET", "foo")) == b"bar" + # wait for the key to expire + time.sleep(1) + # the key is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + + @pytest.mark.parametrize( + "r", [{"cache": _LocalCache(max_size=3, eviction_policy="lfu")}], indirect=True + ) + async def test_cache_lfu_eviction(self, r): + r, cache = r + # add 3 keys to redis + await r.set("foo", "bar") + await r.set("foo2", "bar2") + await r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert await r.get("foo") == b"bar" + assert await r.get("foo2") == b"bar2" + assert await r.get("foo3") == b"bar3" + # change the order of the keys in the cache + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + await r.set("foo4", "bar4") + assert await r.get("foo4") == b"bar4" + # test the eviction policy + assert len(cache.cache) == 3 + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) is None + + @pytest.mark.onlynoncluster + @pytest.mark.parametrize( + "r", + [{"cache": _LocalCache(), "kwargs": {"decode_responses": True}}], + indirect=True, + ) + async def test_cache_decode_response(self, r): + r, cache = r + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == "bar" + # get key from local cache + assert cache.get(("GET", "foo")) == "bar" + # change key in redis (cause invalidation) + await r.set("foo", "barbar") + # send any command to redis (process invalidation in background) + await r.ping() + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert await r.get("foo") == "barbar" + + @pytest.mark.parametrize( + "r", + [{"cache": _LocalCache(), "kwargs": {"cache_blacklist": ["LLEN"]}}], + indirect=True, + ) + async def test_cache_blacklist(self, r): + r, cache = r + # add list to redis + await r.lpush("mylist", "foo", "bar", "baz") + assert await r.llen("mylist") == 3 + assert await r.lindex("mylist", 1) == b"bar" + assert cache.get(("LLEN", "mylist")) is None + assert cache.get(("LINDEX", "mylist", 1)) == b"bar" @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -async def test_cache_blacklist(): - cache = _LocalCache() - r = redis.Redis(client_cache=cache, cache_blacklist=["LLEN"], protocol=3) - # add list to redis - await r.lpush("mylist", "foo", "bar", "baz") - assert await r.llen("mylist") == 3 - assert await r.lindex("mylist", 1) == b"bar" - assert cache.get(("LLEN", "mylist")) is None - assert cache.get(("LINDEX", "mylist", 1)) == b"bar" - - await r.flushdb() - await r.aclose() +@pytest.mark.onlycluster +class TestClusterLocalCache: + @pytest.mark.parametrize("r", [{"cache": _LocalCache()}], indirect=True) + async def test_get_from_cache(self, r, r2): + r, cache = r + # add key to redis + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == b"bar" + # get key from local cache + assert cache.get(("GET", "foo")) == b"bar" + # change key in redis (cause invalidation) + await r2.set("foo", "barbar") + # send any command to redis (process invalidation in background) + node = r.get_node_from_key("foo") + await r.ping(target_nodes=node) + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert await r.get("foo") == b"barbar" + + @pytest.mark.parametrize( + "r", + [{"cache": _LocalCache(), "kwargs": {"decode_responses": True}}], + indirect=True, + ) + async def test_cache_decode_response(self, r): + r, cache = r + await r.set("foo", "bar") + # get key from redis and save in local cache + assert await r.get("foo") == "bar" + # get key from local cache + assert cache.get(("GET", "foo")) == "bar" + # change key in redis (cause invalidation) + await r.set("foo", "barbar") + # send any command to redis (process invalidation in background) + node = r.get_node_from_key("foo") + await r.ping(target_nodes=node) + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert await r.get("foo") == "barbar" diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index e6cf2e4ce7..a57d32f5d2 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -178,6 +178,7 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode: connection = mock.AsyncMock(spec=Connection) connection.is_connected = True connection.read_response.return_value = response + connection._get_from_local_cache.return_value = None while node._free: node._free.pop() node._free.append(connection) @@ -188,6 +189,7 @@ def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode: connection = mock.AsyncMock(spec=Connection) connection.is_connected = True connection.read_response.side_effect = exc + connection._get_from_local_cache.return_value = None while node._free: node._free.pop() node._free.append(connection) diff --git a/tests/test_cache.py b/tests/test_cache.py index 85df8b1a22..570385a4b5 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,131 +4,177 @@ import redis from redis._cache import _LocalCache from redis.utils import HIREDIS_AVAILABLE +from tests.conftest import _get_client -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -def test_get_from_cache(): - cache = _LocalCache() - r = redis.Redis(protocol=3, client_cache=cache) - r2 = redis.Redis(protocol=3) - # add key to redis - r.set("foo", "bar") - # get key from redis and save in local cache - assert r.get("foo") == b"bar" - # get key from local cache - assert cache.get(("GET", "foo")) == b"bar" - # change key in redis (cause invalidation) - r2.set("foo", "barbar") - # send any command to redis (process invalidation in background) - r.ping() - # the command is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - # get key from redis - assert r.get("foo") == b"barbar" - - r.flushdb() - - -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -def test_cache_max_size(): - cache = _LocalCache(max_size=3) - r = redis.Redis(client_cache=cache, protocol=3) - # add 3 keys to redis - r.set("foo", "bar") - r.set("foo2", "bar2") - r.set("foo3", "bar3") - # get 3 keys from redis and save in local cache - assert r.get("foo") == b"bar" - assert r.get("foo2") == b"bar2" - assert r.get("foo3") == b"bar3" - # get the 3 keys from local cache - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo2")) == b"bar2" - assert cache.get(("GET", "foo3")) == b"bar3" - # add 1 more key to redis (exceed the max size) - r.set("foo4", "bar4") - assert r.get("foo4") == b"bar4" - # the first key is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - - r.flushdb() - - -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -def test_cache_ttl(): - cache = _LocalCache(ttl=1) - r = redis.Redis(client_cache=cache, protocol=3) - # add key to redis - r.set("foo", "bar") - # get key from redis and save in local cache - assert r.get("foo") == b"bar" - # get key from local cache - assert cache.get(("GET", "foo")) == b"bar" - # wait for the key to expire - time.sleep(1) - # the key is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - - r.flushdb() - - -@pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -def test_cache_lfu_eviction(): - cache = _LocalCache(max_size=3, eviction_policy="lfu") - r = redis.Redis(client_cache=cache, protocol=3) - # add 3 keys to redis - r.set("foo", "bar") - r.set("foo2", "bar2") - r.set("foo3", "bar3") - # get 3 keys from redis and save in local cache - assert r.get("foo") == b"bar" - assert r.get("foo2") == b"bar2" - assert r.get("foo3") == b"bar3" - # change the order of the keys in the cache - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo3")) == b"bar3" - # add 1 more key to redis (exceed the max size) - r.set("foo4", "bar4") - assert r.get("foo4") == b"bar4" - # test the eviction policy - assert len(cache.cache) == 3 - assert cache.get(("GET", "foo")) == b"bar" - assert cache.get(("GET", "foo2")) is None - - r.flushdb() +@pytest.fixture() +def r(request): + cache = request.param.get("cache") + kwargs = request.param.get("kwargs", {}) + with _get_client( + redis.Redis, request, protocol=3, client_cache=cache, **kwargs + ) as client: + yield client, cache @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -def test_cache_decode_response(): - cache = _LocalCache() - r = redis.Redis(decode_responses=True, client_cache=cache, protocol=3) - r.set("foo", "bar") - # get key from redis and save in local cache - assert r.get("foo") == "bar" - # get key from local cache - assert cache.get(("GET", "foo")) == "bar" - # change key in redis (cause invalidation) - r.set("foo", "barbar") - # send any command to redis (process invalidation in background) - r.ping() - # the command is not in the local cache anymore - assert cache.get(("GET", "foo")) is None - # get key from redis - assert r.get("foo") == "barbar" - - r.flushdb() +class TestLocalCache: + @pytest.mark.onlynoncluster + @pytest.mark.parametrize("r", [{"cache": _LocalCache()}], indirect=True) + def test_get_from_cache(self, r, r2): + r, cache = r + # add key to redis + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == b"bar" + # get key from local cache + assert cache.get(("GET", "foo")) == b"bar" + # change key in redis (cause invalidation) + r2.set("foo", "barbar") + # send any command to redis (process invalidation in background) + r.ping() + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert r.get("foo") == b"barbar" + + @pytest.mark.parametrize("r", [{"cache": _LocalCache(max_size=3)}], indirect=True) + def test_cache_max_size(self, r): + r, cache = r + # add 3 keys to redis + r.set("foo", "bar") + r.set("foo2", "bar2") + r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert r.get("foo") == b"bar" + assert r.get("foo2") == b"bar2" + assert r.get("foo3") == b"bar3" + # get the 3 keys from local cache + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) == b"bar2" + assert cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + r.set("foo4", "bar4") + assert r.get("foo4") == b"bar4" + # the first key is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + + @pytest.mark.parametrize("r", [{"cache": _LocalCache(ttl=1)}], indirect=True) + def test_cache_ttl(self, r): + r, cache = r + # add key to redis + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == b"bar" + # get key from local cache + assert cache.get(("GET", "foo")) == b"bar" + # wait for the key to expire + time.sleep(1) + # the key is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + + @pytest.mark.parametrize( + "r", [{"cache": _LocalCache(max_size=3, eviction_policy="lfu")}], indirect=True + ) + def test_cache_lfu_eviction(self, r): + r, cache = r + # add 3 keys to redis + r.set("foo", "bar") + r.set("foo2", "bar2") + r.set("foo3", "bar3") + # get 3 keys from redis and save in local cache + assert r.get("foo") == b"bar" + assert r.get("foo2") == b"bar2" + assert r.get("foo3") == b"bar3" + # change the order of the keys in the cache + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo3")) == b"bar3" + # add 1 more key to redis (exceed the max size) + r.set("foo4", "bar4") + assert r.get("foo4") == b"bar4" + # test the eviction policy + assert len(cache.cache) == 3 + assert cache.get(("GET", "foo")) == b"bar" + assert cache.get(("GET", "foo2")) is None + + @pytest.mark.onlynoncluster + @pytest.mark.parametrize( + "r", + [{"cache": _LocalCache(), "kwargs": {"decode_responses": True}}], + indirect=True, + ) + def test_cache_decode_response(self, r): + r, cache = r + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == "bar" + # get key from local cache + assert cache.get(("GET", "foo")) == "bar" + # change key in redis (cause invalidation) + r.set("foo", "barbar") + # send any command to redis (process invalidation in background) + r.ping() + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert r.get("foo") == "barbar" + + @pytest.mark.parametrize( + "r", + [{"cache": _LocalCache(), "kwargs": {"cache_blacklist": ["LLEN"]}}], + indirect=True, + ) + def test_cache_blacklist(self, r): + r, cache = r + # add list to redis + r.lpush("mylist", "foo", "bar", "baz") + assert r.llen("mylist") == 3 + assert r.lindex("mylist", 1) == b"bar" + assert cache.get(("LLEN", "mylist")) is None + assert cache.get(("LINDEX", "mylist", 1)) == b"bar" @pytest.mark.skipif(HIREDIS_AVAILABLE, reason="PythonParser only") -def test_cache_blacklist(): - cache = _LocalCache() - r = redis.Redis(client_cache=cache, cache_blacklist=["LLEN"], protocol=3) - # add list to redis - r.lpush("mylist", "foo", "bar", "baz") - assert r.llen("mylist") == 3 - assert r.lindex("mylist", 1) == b"bar" - assert cache.get(("LLEN", "mylist")) is None - assert cache.get(("LINDEX", "mylist", 1)) == b"bar" - - r.flushdb() +@pytest.mark.onlycluster +class TestClusterLocalCache: + @pytest.mark.parametrize("r", [{"cache": _LocalCache()}], indirect=True) + def test_get_from_cache(self, r, r2): + r, cache = r + # add key to redis + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == b"bar" + # get key from local cache + assert cache.get(("GET", "foo")) == b"bar" + # change key in redis (cause invalidation) + r2.set("foo", "barbar") + # send any command to redis (process invalidation in background) + node = r.get_node_from_key("foo") + r.ping(target_nodes=node) + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert r.get("foo") == b"barbar" + + @pytest.mark.parametrize( + "r", + [{"cache": _LocalCache(), "kwargs": {"decode_responses": True}}], + indirect=True, + ) + def test_cache_decode_response(self, r): + r, cache = r + r.set("foo", "bar") + # get key from redis and save in local cache + assert r.get("foo") == "bar" + # get key from local cache + assert cache.get(("GET", "foo")) == "bar" + # change key in redis (cause invalidation) + r.set("foo", "barbar") + # send any command to redis (process invalidation in background) + node = r.get_node_from_key("foo") + r.ping(target_nodes=node) + # the command is not in the local cache anymore + assert cache.get(("GET", "foo")) is None + # get key from redis + assert r.get("foo") == "barbar" diff --git a/tests/test_cluster.py b/tests/test_cluster.py index ae194db3a2..854b64c563 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -202,6 +202,7 @@ def cmd_init_mock(self, r): def mock_node_resp(node, response): connection = Mock() connection.read_response.return_value = response + connection._get_from_local_cache.return_value = None node.redis_connection.connection = connection return node @@ -209,6 +210,7 @@ def mock_node_resp(node, response): def mock_node_resp_func(node, func): connection = Mock() connection.read_response.side_effect = func + connection._get_from_local_cache.return_value = None node.redis_connection.connection = connection return node @@ -477,6 +479,7 @@ def mock_execute_command(*_args, **_kwargs): redis_mock_node.execute_command.side_effect = mock_execute_command # Mock response value for all other commands redis_mock_node.parse_response.return_value = "MOCK_OK" + redis_mock_node.connection._get_from_local_cache.return_value = None for node in r.get_nodes(): if node.port != primary.port: node.redis_connection = redis_mock_node From 63932bb0a1bc876410241786b863420e7d1d33e1 Mon Sep 17 00:00:00 2001 From: Chayim Date: Tue, 9 Jan 2024 13:22:09 +0200 Subject: [PATCH 011/196] Always sending codecov (#3101) --- .github/workflows/integration.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index f4ba256359..7aaf346170 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -89,7 +89,6 @@ jobs: - name: Upload codecov coverage uses: codecov/codecov-action@v3 - if: ${{matrix.python-version == '3.11'}} with: fail_ci_if_error: false From 902754b843e6cf1174a114322eea417ee467d454 Mon Sep 17 00:00:00 2001 From: Chayim Date: Tue, 9 Jan 2024 13:43:40 +0200 Subject: [PATCH 012/196] filter commits for main branch (#3036) --- .github/release-drafter-config.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/.github/release-drafter-config.yml b/.github/release-drafter-config.yml index 9ccb28aca4..4607da071c 100644 --- a/.github/release-drafter-config.yml +++ b/.github/release-drafter-config.yml @@ -1,5 +1,7 @@ name-template: '$NEXT_MINOR_VERSION' tag-template: 'v$NEXT_MINOR_VERSION' +filter-by-commitish: true +commitish: master autolabeler: - label: 'maintenance' files: @@ -15,7 +17,7 @@ autolabeler: branch: - '/feature-.+' categories: - - title: 'Breaking Changes' + - title: '🔥 Breaking Changes' labels: - 'breakingchange' - title: '🧪 Experimental Features' @@ -32,7 +34,12 @@ categories: - 'bug' - 'BUG' - title: '🧰 Maintenance' - label: 'maintenance' + labels: + - 'maintenance' + - 'dependencies' + - 'documentation' + - 'docs' + - 'testing' change-template: '- $TITLE (#$NUMBER)' exclude-labels: - 'skip-changelog' From f9b6a5e2a7a1cf29c0044be80fdc237768493282 Mon Sep 17 00:00:00 2001 From: "Wei-Hsiang (Matt) Wang" Date: Tue, 9 Jan 2024 19:46:08 +0800 Subject: [PATCH 013/196] fix(docs): organize cluster mode part of lua scripting (#3073) --- docs/lua_scripting.rst | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/docs/lua_scripting.rst b/docs/lua_scripting.rst index 0edb6b6723..bd7b9bc01d 100644 --- a/docs/lua_scripting.rst +++ b/docs/lua_scripting.rst @@ -92,19 +92,24 @@ Cluster Mode Cluster mode has limited support for lua scripting. -The following commands are supported, with caveats: - ``EVAL`` and -``EVALSHA``: The command is sent to the relevant node, depending on the -keys (i.e., in ``EVAL "