Skip to content

Commit 688ad57

Browse files
committed
fix cache proxy connection unbalanced command send and read
1 parent a006e22 commit 688ad57

File tree

2 files changed

+40
-34
lines changed

2 files changed

+40
-34
lines changed

redis/connection.py

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,7 +1365,7 @@ def __init__(
13651365
self._pool_lock = pool_lock
13661366
self._cache = cache
13671367
self._cache_lock = threading.RLock()
1368-
self._current_command_cache_key = None
1368+
self._current_command_cache_entry = None
13691369
self._current_options = None
13701370
self.register_connect_callback(self._enable_tracking_callback)
13711371

@@ -1453,42 +1453,49 @@ def send_command(self, *args, **kwargs):
14531453
if not self._cache.is_cachable(
14541454
CacheKey(command=args[0], redis_keys=(), redis_args=())
14551455
):
1456-
self._current_command_cache_key = None
1456+
self._current_command_cache_entry = None
14571457
self._conn.send_command(*args, **kwargs)
14581458
return
14591459

14601460
if kwargs.get("keys") is None:
14611461
raise ValueError("Cannot create cache key.")
14621462

1463-
# Creates cache key.
1464-
self._current_command_cache_key = CacheKey(
1463+
cache_key = CacheKey(
14651464
command=args[0], redis_keys=tuple(kwargs.get("keys")), redis_args=args
14661465
)
1466+
self._current_command_cache_entry = None
14671467

14681468
with self._cache_lock:
1469-
# We have to trigger invalidation processing in case if
1470-
# it was cached by another connection to avoid
1471-
# queueing invalidations in stale connections.
1472-
if self._cache.get(self._current_command_cache_key):
1473-
entry = self._cache.get(self._current_command_cache_key)
1474-
1475-
if entry.connection_ref != self._conn:
1469+
cache_entry = self._cache.get(cache_key)
1470+
if cache_entry is not None and cache_entry.status == CacheEntryStatus.VALID:
1471+
# We have to trigger invalidation processing in case if
1472+
# it was cached by another connection to avoid
1473+
# queueing invalidations in stale connections.
1474+
if cache_entry.connection_ref != self._conn:
14761475
with self._pool_lock:
1477-
while entry.connection_ref.can_read():
1478-
entry.connection_ref.read_response(push_request=True)
1479-
1480-
return
1476+
while cache_entry.connection_ref.can_read():
1477+
cache_entry.connection_ref.read_response(push_request=True)
1478+
# Check if entry still exists.
1479+
if self._cache.get(cache_key) is not None:
1480+
self._current_command_cache_entry = cache_entry
1481+
return
1482+
cache_entry = None
1483+
else:
1484+
self._current_command_cache_entry = cache_entry
1485+
return
14811486

1482-
# Set temporary entry value to prevent
1483-
# race condition from another connection.
1484-
self._cache.set(
1485-
CacheEntry(
1486-
cache_key=self._current_command_cache_key,
1487+
if cache_entry is None:
1488+
# Creates cache entry.
1489+
cache_entry = CacheEntry(
1490+
cache_key=cache_key,
14871491
cache_value=self.DUMMY_CACHE_VALUE,
14881492
status=CacheEntryStatus.IN_PROGRESS,
14891493
connection_ref=self._conn,
14901494
)
1491-
)
1495+
# Set temporary entry value to prevent
1496+
# race condition from another connection.
1497+
self._cache.set(cache_entry)
1498+
self._current_command_cache_entry = cache_entry
14921499

14931500
# Send command over socket only if it's allowed
14941501
# read-only command that not yet cached.
@@ -1501,17 +1508,15 @@ def read_response(
15011508
self, disable_decoding=False, *, disconnect_on_error=True, push_request=False
15021509
):
15031510
with self._cache_lock:
1504-
# Check if command response exists in a cache and it's not in progress.
1511+
# Check if command response cache entry exists and it's valid.
15051512
if (
1506-
self._current_command_cache_key is not None
1507-
and self._cache.get(self._current_command_cache_key) is not None
1508-
and self._cache.get(self._current_command_cache_key).status
1509-
!= CacheEntryStatus.IN_PROGRESS
1513+
self._current_command_cache_entry is not None
1514+
and self._current_command_cache_entry.status == CacheEntryStatus.VALID
15101515
):
15111516
res = copy.deepcopy(
1512-
self._cache.get(self._current_command_cache_key).cache_value
1517+
self._current_command_cache_entry.cache_value
15131518
)
1514-
self._current_command_cache_key = None
1519+
self._current_command_cache_entry = None
15151520
return res
15161521

15171522
response = self._conn.read_response(
@@ -1522,23 +1527,24 @@ def read_response(
15221527

15231528
with self._cache_lock:
15241529
# Prevent not-allowed command from caching.
1525-
if self._current_command_cache_key is None:
1530+
if self._current_command_cache_entry is None:
15261531
return response
15271532
# If response is None prevent from caching.
1533+
cache_key = self._current_command_cache_entry.cache_key
15281534
if response is None:
1529-
self._cache.delete_by_cache_keys([self._current_command_cache_key])
1535+
self._cache.delete_by_cache_keys([cache_key])
15301536
return response
15311537

1532-
cache_entry = self._cache.get(self._current_command_cache_key)
1538+
cache_entry = self._cache.get(cache_key)
15331539

15341540
# Cache only responses that still valid
15351541
# and wasn't invalidated by another connection in meantime.
1536-
if cache_entry is not None:
1542+
if cache_entry is self._current_command_cache_entry:
15371543
cache_entry.status = CacheEntryStatus.VALID
15381544
cache_entry.cache_value = response
15391545
self._cache.set(cache_entry)
15401546

1541-
self._current_command_cache_key = None
1547+
self._current_command_cache_entry = None
15421548

15431549
return response
15441550

tests/test_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ def test_read_response_returns_cached_reply(self, mock_cache, mock_connection):
526526
)
527527
proxy_connection.send_command(*["GET", "foo"], **{"keys": ["foo"]})
528528
assert proxy_connection.read_response() == b"bar"
529-
assert proxy_connection._current_command_cache_key is None
529+
assert proxy_connection._current_command_cache_entry is None
530530
assert proxy_connection.read_response() == b"bar"
531531

532532
mock_cache.set.assert_has_calls(

0 commit comments

Comments
 (0)