@@ -1555,7 +1555,7 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
1555
1555
| Existing :class:`~.RedisCluster` client
1556
1556
"""
1557
1557
1558
- __slots__ = ("cluster_client" ,)
1558
+ __slots__ = ("cluster_client" , "_transaction" , "_execution_strategy" )
1559
1559
1560
1560
def __init__ (
1561
1561
self , client : RedisCluster , transaction : Optional [bool ] = None
@@ -1570,15 +1570,15 @@ def __init__(
1570
1570
1571
1571
async def initialize (self ) -> "ClusterPipeline" :
1572
1572
if self .cluster_client ._initialize :
1573
- await self .cluster_client .initialize ()
1573
+ await self ._execution_strategy .initialize ()
1574
1574
self ._execution_strategy ._command_queue = []
1575
1575
return self
1576
1576
1577
1577
async def __aenter__ (self ) -> "ClusterPipeline" :
1578
1578
return await self .initialize ()
1579
1579
1580
1580
async def __aexit__ (self , exc_type : None , exc_value : None , traceback : None ) -> None :
1581
- self ._execution_strategy . _command_queue = []
1581
+ await self .reset ()
1582
1582
1583
1583
def __await__ (self ) -> Generator [Any , None , "ClusterPipeline" ]:
1584
1584
return self .initialize ().__await__ ()
@@ -1595,7 +1595,7 @@ def __bool__(self) -> bool:
1595
1595
return True
1596
1596
1597
1597
def __len__ (self ) -> int :
1598
- return len (self ._execution_strategy . _command_queue )
1598
+ return len (self ._execution_strategy )
1599
1599
1600
1600
def execute_command (
1601
1601
self , * args : Union [KeyT , EncodableT ], ** kwargs : Any
@@ -1794,32 +1794,16 @@ async def unlink(self, *names):
1794
1794
"""
1795
1795
pass
1796
1796
1797
+ @abstractmethod
1798
+ def __len__ (self ) -> int :
1799
+ pass
1800
+
1797
1801
1798
1802
class AbstractStrategy (ExecutionStrategy ):
1799
1803
def __init__ (self , pipe : ClusterPipeline ) -> None :
1800
1804
self ._pipe : ClusterPipeline = pipe
1801
1805
self ._command_queue : List ["PipelineCommand" ] = []
1802
1806
1803
- async def __aenter__ (self ) -> "ClusterPipeline" :
1804
- return await self ._pipe .initialize ()
1805
-
1806
- async def __aexit__ (self , exc_type : None , exc_value : None , traceback : None ) -> None :
1807
- self ._command_queue = []
1808
-
1809
- def __await__ (self ) -> Generator [Any , None , "ClusterPipeline" ]:
1810
- return self ._pipe .initialize ().__await__ ()
1811
-
1812
- def __enter__ (self ) -> "ClusterPipeline" :
1813
- self ._command_queue = []
1814
- return self ._pipe
1815
-
1816
- def __exit__ (self , exc_type : None , exc_value : None , traceback : None ) -> None :
1817
- self ._command_queue = []
1818
-
1819
- def __bool__ (self ) -> bool :
1820
- "Pipeline instances should always evaluate to True on Python 3+"
1821
- return True
1822
-
1823
1807
async def initialize (self ) -> "ClusterPipeline" :
1824
1808
if self ._pipe .cluster_client ._initialize :
1825
1809
await self ._pipe .cluster_client .initialize ()
@@ -1881,6 +1865,9 @@ async def discard(self):
1881
1865
async def unlink (self , * names ):
1882
1866
pass
1883
1867
1868
+ def __len__ (self ) -> int :
1869
+ return len (self ._command_queue )
1870
+
1884
1871
1885
1872
class PipelineStrategy (AbstractStrategy ):
1886
1873
def __init__ (self , pipe : ClusterPipeline ) -> None :
@@ -1931,7 +1918,7 @@ async def execute(
1931
1918
# All other errors should be raised.
1932
1919
raise e
1933
1920
finally :
1934
- self ._command_queue = []
1921
+ await self .reset ()
1935
1922
1936
1923
async def _execute (
1937
1924
self ,
0 commit comments