Skip to content

Commit 2c0c812

Browse files
committed
Finished CacheProxyConnection implementation, added comments
1 parent 48607e9 commit 2c0c812

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

redis/connection.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -760,59 +760,56 @@ def check_health(self):
760760
self._conn.check_health()
761761

762762
def send_packed_command(self, command, check_health=True):
763-
cache_key = hashkey(command)
764-
765-
if self._cache.get(cache_key):
766-
self._current_command_hash = cache_key
767-
return
768-
769-
self._current_command_hash = None
763+
self._process_pending_invalidations()
764+
# TODO: Investigate if it's possible to unpack command or extract keys from packed command
770765
self._conn.send_packed_command(command)
771766

772767
def send_command(self, *args, **kwargs):
768+
self._process_pending_invalidations()
769+
770+
# If command is write command or not allowed to cache skip it.
773771
if not self._conf.is_allowed_to_cache(args[0]):
774772
self._current_command_hash = None
775773
self._current_command_keys = None
776774
self._conn.send_command(*args, **kwargs)
777775
return
778776

777+
# Create hash representation of current executed command.
779778
self._current_command_hash = hashkey(*args)
780779

780+
# Extract keys from current command.
781781
if kwargs.get("keys"):
782782
self._current_command_keys = kwargs["keys"]
783783

784784
if not isinstance(self._current_command_keys, list):
785785
raise TypeError("Cache keys must be a list.")
786786

787+
# If current command reply already cached prevent sending data over socket.
787788
if self._cache.get(self._current_command_hash):
788789
return
789790

791+
# Send command over socket only if it's read-only command that not yet cached.
790792
self._conn.send_command(*args, **kwargs)
791793

792794
def can_read(self, timeout=0):
793795
return self._conn.can_read(timeout)
794796

795797
def read_response(self, disable_decoding=False, *, disconnect_on_error=True, push_request=False):
798+
# Check if command response exists in a cache.
799+
if self._current_command_hash in self._cache:
800+
return self._cache[self._current_command_hash]
801+
796802
response = self._conn.read_response(
797803
disable_decoding=disable_decoding,
798804
disconnect_on_error=disconnect_on_error,
799805
push_request=push_request
800806
)
801807

802-
if isinstance(response, List) and len(response) > 0 and response[0] == 'invalidate':
803-
self._on_invalidation_callback(response)
804-
self.read_response(
805-
disable_decoding=disable_decoding,
806-
disconnect_on_error=disconnect_on_error,
807-
push_request=push_request
808-
)
809-
808+
# Check if command that was sent is write command to prevent caching of write replies.
810809
if response is None or self._current_command_hash is None:
811810
return response
812811

813-
if self._current_command_hash in self._cache:
814-
return self._cache[self._current_command_hash]
815-
812+
# Create separate mapping for keys or add current response to associated keys.
816813
for key in self._current_command_keys:
817814
if key in self._keys_mapping:
818815
if self._current_command_hash not in self._keys_mapping[key]:
@@ -824,10 +821,10 @@ def read_response(self, disable_decoding=False, *, disconnect_on_error=True, pus
824821
return response
825822

826823
def pack_command(self, *args):
827-
pass
824+
return self._conn.pack_command(*args)
828825

829826
def pack_commands(self, commands):
830-
pass
827+
return self._conn.pack_commands(commands)
831828

832829
def _connect(self):
833830
self._conn._connect()
@@ -838,24 +835,27 @@ def _host_error(self):
838835
def _enable_tracking_callback(self, conn: ConnectionInterface) -> None:
839836
conn.send_command('CLIENT', 'TRACKING', 'ON')
840837
conn.read_response()
838+
conn._parser.set_invalidation_push_handler(self._on_invalidation_callback)
841839

842840
def _process_pending_invalidations(self):
843-
print(f'connection {self} {id(self)} process invalidations')
844841
while self.can_read():
845-
self.read_response(push_request=True)
842+
self._conn.read_response(push_request=True)
846843

847844
def _on_invalidation_callback(
848845
self, data: List[Union[str, Optional[List[str]]]]
849846
):
847+
# Flush cache when DB flushed on server-side
850848
if data[1] is None:
851849
self._cache.clear()
852850
else:
853851
for key in data[1]:
854852
normalized_key = ensure_string(key)
855853
if normalized_key in self._keys_mapping:
854+
# Make sure that all command responses associated with this key will be deleted
856855
for cache_key in self._keys_mapping[normalized_key]:
857856
self._cache.pop(cache_key)
858-
857+
# Removes key from mapping cache
858+
self._keys_mapping.pop(normalized_key)
859859

860860

861861
class SSLConnection(Connection):
@@ -1235,7 +1235,6 @@ def __init__(
12351235
connection_kwargs.pop("cache_ttl", None)
12361236
connection_kwargs.pop("cache", None)
12371237

1238-
12391238
# a lock to protect the critical section in _checkpid().
12401239
# this lock is acquired when the process id changes, such as
12411240
# after a fork. during this time, multiple threads in the child
@@ -1343,7 +1342,7 @@ def get_connection(self, command_name: str, *keys, **options) -> "Connection":
13431342
# pool before all data has been read or the socket has been
13441343
# closed. either way, reconnect and verify everything is good.
13451344
try:
1346-
if connection.can_read():
1345+
if connection.can_read() and self._cache is None:
13471346
raise ConnectionError("Connection has data")
13481347
except (ConnectionError, OSError):
13491348
connection.disconnect()

0 commit comments

Comments
 (0)