35
35
TimeoutError ,
36
36
)
37
37
from .retry import Retry
38
- from .scheduler import Scheduler
39
38
from .utils import (
40
39
CRYPTOGRAPHY_AVAILABLE ,
41
40
HIREDIS_AVAILABLE ,
@@ -741,12 +740,18 @@ class CacheProxyConnection(ConnectionInterface):
741
740
MIN_ALLOWED_VERSION = "7.4.0"
742
741
DEFAULT_SERVER_NAME = "redis"
743
742
744
- def __init__ (self , conn : ConnectionInterface , cache : CacheInterface ):
743
+ def __init__ (
744
+ self ,
745
+ conn : ConnectionInterface ,
746
+ cache : CacheInterface ,
747
+ pool_lock : threading .Lock ,
748
+ ):
745
749
self .pid = os .getpid ()
746
750
self ._conn = conn
747
751
self .retry = self ._conn .retry
748
752
self .host = self ._conn .host
749
753
self .port = self ._conn .port
754
+ self ._pool_lock = pool_lock
750
755
self ._cache = cache
751
756
self ._cache_lock = threading .Lock ()
752
757
self ._current_command_cache_key = None
@@ -824,9 +829,17 @@ def send_command(self, *args, **kwargs):
824
829
)
825
830
826
831
with self ._cache_lock :
827
- # If current command reply already cached
828
- # prevent sending data over socket.
832
+ # We have to trigger invalidation processing in case if
833
+ # it was cached by another connection to avoid
834
+ # queueing invalidations in stale connections.
829
835
if self ._cache .get (self ._current_command_cache_key ):
836
+ entry = self ._cache .get (self ._current_command_cache_key )
837
+
838
+ if entry .connection_ref != self ._conn :
839
+ with self ._pool_lock :
840
+ while entry .connection_ref .can_read ():
841
+ entry .connection_ref .read_response (push_request = True )
842
+
830
843
return
831
844
832
845
# Set temporary entry value to prevent
@@ -836,6 +849,7 @@ def send_command(self, *args, **kwargs):
836
849
cache_key = self ._current_command_cache_key ,
837
850
cache_value = self .DUMMY_CACHE_VALUE ,
838
851
status = CacheEntryStatus .IN_PROGRESS ,
852
+ connection_ref = self ._conn ,
839
853
)
840
854
)
841
855
@@ -857,7 +871,9 @@ def read_response(
857
871
and self ._cache .get (self ._current_command_cache_key ).status
858
872
!= CacheEntryStatus .IN_PROGRESS
859
873
):
860
- return self ._cache .get (self ._current_command_cache_key ).cache_value
874
+ return copy .deepcopy (
875
+ self ._cache .get (self ._current_command_cache_key ).cache_value
876
+ )
861
877
862
878
response = self ._conn .read_response (
863
879
disable_decoding = disable_decoding ,
@@ -879,13 +895,9 @@ def read_response(
879
895
# Cache only responses that still valid
880
896
# and wasn't invalidated by another connection in meantime.
881
897
if cache_entry is not None :
882
- self ._cache .set (
883
- CacheEntry (
884
- cache_key = self ._current_command_cache_key ,
885
- cache_value = response ,
886
- status = CacheEntryStatus .VALID ,
887
- )
888
- )
898
+ cache_entry .status = CacheEntryStatus .VALID
899
+ cache_entry .cache_value = response
900
+ self ._cache .set (cache_entry )
889
901
890
902
return response
891
903
@@ -1284,9 +1296,6 @@ def __init__(
1284
1296
self .max_connections = max_connections
1285
1297
self .cache = None
1286
1298
self ._cache_factory = cache_factory
1287
- self ._scheduler = None
1288
- self ._hc_cancel_event = None
1289
- self ._hc_thread = None
1290
1299
1291
1300
if connection_kwargs .get ("cache_config" ) or connection_kwargs .get ("cache" ):
1292
1301
if connection_kwargs .get ("protocol" ) not in [3 , "3" ]:
@@ -1307,8 +1316,6 @@ def __init__(
1307
1316
self .connection_kwargs .get ("cache_config" )
1308
1317
).get_cache ()
1309
1318
1310
- self ._scheduler = Scheduler ()
1311
-
1312
1319
connection_kwargs .pop ("cache" , None )
1313
1320
connection_kwargs .pop ("cache_config" , None )
1314
1321
@@ -1322,7 +1329,6 @@ def __init__(
1322
1329
# release the lock.
1323
1330
self ._fork_lock = threading .Lock ()
1324
1331
self .reset ()
1325
- self .run_scheduled_healthcheck ()
1326
1332
1327
1333
def __repr__ (self ) -> (str , str ):
1328
1334
return (
@@ -1452,7 +1458,7 @@ def make_connection(self) -> "ConnectionInterface":
1452
1458
1453
1459
if self .cache is not None :
1454
1460
return CacheProxyConnection (
1455
- self .connection_class (** self .connection_kwargs ), self .cache
1461
+ self .connection_class (** self .connection_kwargs ), self .cache , self . _lock
1456
1462
)
1457
1463
1458
1464
return self .connection_class (** self .connection_kwargs )
@@ -1501,8 +1507,6 @@ def disconnect(self, inuse_connections: bool = True) -> None:
1501
1507
for connection in connections :
1502
1508
connection .disconnect ()
1503
1509
1504
- self .stop_scheduled_healthcheck ()
1505
-
1506
1510
def close (self ) -> None :
1507
1511
"""Close the pool, disconnecting all connections"""
1508
1512
self .disconnect ()
@@ -1514,33 +1518,6 @@ def set_retry(self, retry: "Retry") -> None:
1514
1518
for conn in self ._in_use_connections :
1515
1519
conn .retry = retry
1516
1520
1517
- def run_scheduled_healthcheck (self ) -> None :
1518
- # Run scheduled healthcheck to avoid stale invalidations in idle connections.
1519
- if self .cache is not None and self ._scheduler is not None :
1520
- self ._hc_cancel_event = threading .Event ()
1521
- hc_interval = self .cache .get_config ().get_health_check_interval ()
1522
- self ._hc_thread = self ._scheduler .run_with_interval (
1523
- self ._perform_health_check , hc_interval , self ._hc_cancel_event
1524
- )
1525
-
1526
- def stop_scheduled_healthcheck (self ) -> None :
1527
- # Send an event to stop scheduled healthcheck execution.
1528
- if self ._hc_cancel_event is not None and not self ._hc_cancel_event .is_set ():
1529
- self ._hc_cancel_event .set ()
1530
-
1531
- # Joins healthcheck thread on disconnect.
1532
- if self ._hc_thread is not None and not self ._hc_thread .is_alive ():
1533
- self ._hc_thread .join ()
1534
-
1535
- def _perform_health_check (self , done : threading .Event ) -> None :
1536
- self ._checkpid ()
1537
- with self ._lock :
1538
- while self ._available_connections :
1539
- conn = self ._available_connections .pop ()
1540
- conn .send_command ("PING" )
1541
- conn .read_response ()
1542
- done .set ()
1543
-
1544
1521
1545
1522
class BlockingConnectionPool (ConnectionPool ):
1546
1523
"""
@@ -1620,7 +1597,7 @@ def make_connection(self):
1620
1597
"Make a fresh connection."
1621
1598
if self .cache is not None :
1622
1599
connection = CacheProxyConnection (
1623
- self .connection_class (** self .connection_kwargs ), self .cache
1600
+ self .connection_class (** self .connection_kwargs ), self .cache , self . _lock
1624
1601
)
1625
1602
else :
1626
1603
connection = self .connection_class (** self .connection_kwargs )
@@ -1705,5 +1682,3 @@ def disconnect(self):
1705
1682
self ._checkpid ()
1706
1683
for connection in self ._connections :
1707
1684
connection .disconnect ()
1708
-
1709
- self .stop_scheduled_healthcheck ()
0 commit comments