Skip to content

Commit aac241b

Browse files
committed
(fix): Separated retries for read and write operations
1 parent fa3a0ca commit aac241b

File tree

6 files changed

+156
-33
lines changed

6 files changed

+156
-33
lines changed

redis/asyncio/client.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -619,13 +619,6 @@ async def close(self, close_connection_pool: Optional[bool] = None) -> None:
619619
"""
620620
await self.aclose(close_connection_pool)
621621

622-
async def _send_command_parse_response(self, conn, command_name, *args, **options):
623-
"""
624-
Send a command and parse the response
625-
"""
626-
await conn.send_command(*args)
627-
return await self.parse_response(conn, command_name, **options)
628-
629622
async def _disconnect_raise(self, conn: Connection, error: Exception):
630623
"""
631624
Close the connection and raise an exception
@@ -650,10 +643,12 @@ async def execute_command(self, *args, **options):
650643
if self.single_connection_client:
651644
await self._single_conn_lock.acquire()
652645
try:
646+
await conn.retry.call_with_retry(
647+
lambda: conn.send_command(*args, **options),
648+
lambda error: self._disconnect_raise(conn, error),
649+
)
653650
return await conn.retry.call_with_retry(
654-
lambda: self._send_command_parse_response(
655-
conn, command_name, *args, **options
656-
),
651+
lambda: self.parse_response(conn, command_name, **options),
657652
lambda error: self._disconnect_raise(conn, error),
658653
)
659654
finally:
@@ -1378,11 +1373,13 @@ async def immediate_execute_command(self, *args, **options):
13781373
conn = await self.connection_pool.get_connection()
13791374
self.connection = conn
13801375

1376+
await conn.retry.call_with_retry(
1377+
lambda: conn.send_command(*args, **options),
1378+
lambda error: self._disconnect_raise(conn, error),
1379+
)
13811380
return await conn.retry.call_with_retry(
1382-
lambda: self._send_command_parse_response(
1383-
conn, command_name, *args, **options
1384-
),
1385-
lambda error: self._disconnect_reset_raise(conn, error),
1381+
lambda: self.parse_response(conn, command_name, **options),
1382+
lambda error: self._disconnect_raise(conn, error),
13861383
)
13871384

13881385
def pipeline_execute_command(self, *args, **options):

redis/client.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -590,13 +590,6 @@ def close(self) -> None:
590590
if self.auto_close_connection_pool:
591591
self.connection_pool.disconnect()
592592

593-
def _send_command_parse_response(self, conn, command_name, *args, **options):
594-
"""
595-
Send a command and parse the response
596-
"""
597-
conn.send_command(*args, **options)
598-
return self.parse_response(conn, command_name, **options)
599-
600593
def _disconnect_raise(self, conn, error):
601594
"""
602595
Close the connection and raise an exception
@@ -623,10 +616,12 @@ def _execute_command(self, *args, **options):
623616
if self._single_connection_client:
624617
self.single_connection_lock.acquire()
625618
try:
619+
conn.retry.call_with_retry(
620+
lambda: conn.send_command(*args, **options),
621+
lambda error: self._disconnect_raise(conn, error),
622+
)
626623
return conn.retry.call_with_retry(
627-
lambda: self._send_command_parse_response(
628-
conn, command_name, *args, **options
629-
),
624+
lambda: self.parse_response(conn, command_name, **options),
630625
lambda error: self._disconnect_raise(conn, error),
631626
)
632627
finally:
@@ -1408,11 +1403,13 @@ def immediate_execute_command(self, *args, **options):
14081403
conn = self.connection_pool.get_connection()
14091404
self.connection = conn
14101405

1406+
conn.retry.call_with_retry(
1407+
lambda: conn.send_command(*args, **options),
1408+
lambda error: self._disconnect_raise(conn, error),
1409+
)
14111410
return conn.retry.call_with_retry(
1412-
lambda: self._send_command_parse_response(
1413-
conn, command_name, *args, **options
1414-
),
1415-
lambda error: self._disconnect_reset_raise(conn, error),
1411+
lambda: self.parse_response(conn, command_name, **options),
1412+
lambda error: self._disconnect_raise(conn, error),
14161413
)
14171414

14181415
def pipeline_execute_command(self, *args, **options) -> "Pipeline":

tests/conftest.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@
2323
CacheKey,
2424
EvictionPolicy,
2525
)
26-
from redis.connection import Connection, ConnectionInterface, SSLConnection, parse_url
26+
from redis.connection import (
27+
Connection,
28+
ConnectionInterface,
29+
SSLConnection,
30+
parse_url,
31+
ConnectionPool,
32+
)
2733
from redis.credentials import CredentialProvider
2834
from redis.exceptions import RedisClusterException
2935
from redis.retry import Retry
@@ -582,6 +588,12 @@ def mock_connection() -> ConnectionInterface:
582588
return mock_connection
583589

584590

591+
@pytest.fixture()
592+
def mock_pool() -> ConnectionPool:
593+
mock_pool = Mock(spec=ConnectionPool)
594+
return mock_pool
595+
596+
585597
@pytest.fixture()
586598
def cache_key(request) -> CacheKey:
587599
command = request.param.get("command")

tests/test_asyncio/conftest.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import random
22
from contextlib import asynccontextmanager as _asynccontextmanager
33
from typing import Union
4+
from unittest.mock import Mock
45

56
import pytest
67
import pytest_asyncio
78
import redis.asyncio as redis
89
from packaging.version import Version
910
from redis.asyncio import Sentinel
1011
from redis.asyncio.client import Monitor
11-
from redis.asyncio.connection import Connection, parse_url
12+
from redis.asyncio.connection import Connection, parse_url, ConnectionPool
1213
from redis.asyncio.retry import Retry
1314
from redis.backoff import NoBackoff
1415
from redis.credentials import CredentialProvider
@@ -219,6 +220,18 @@ async def mock_cluster_resp_slaves(create_redis, **kwargs):
219220
yield mocked
220221

221222

223+
@pytest_asyncio.fixture()
224+
def mock_connection() -> Connection:
225+
mock_connection = Mock(spec=Connection)
226+
return mock_connection
227+
228+
229+
@pytest_asyncio.fixture()
230+
def mock_pool() -> ConnectionPool:
231+
mock_pool = Mock(spec=ConnectionPool)
232+
return mock_pool
233+
234+
222235
@pytest_asyncio.fixture()
223236
async def credential_provider(request) -> CredentialProvider:
224237
return get_credential_provider(request)

tests/test_asyncio/test_connection.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import socket
33
import types
44
from errno import ECONNREFUSED
5-
from unittest.mock import patch
5+
from unittest.mock import patch, call
66

77
import pytest
88
import redis
@@ -20,7 +20,7 @@
2020
parse_url,
2121
)
2222
from redis.asyncio.retry import Retry
23-
from redis.backoff import NoBackoff
23+
from redis.backoff import NoBackoff, ExponentialBackoff
2424
from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError
2525
from redis.utils import HIREDIS_AVAILABLE
2626
from tests.conftest import skip_if_server_version_lt
@@ -315,6 +315,57 @@ async def get_redis_connection():
315315
await r1.aclose()
316316

317317

318+
@pytest.mark.onlynoncluster
319+
async def test_client_do_not_retry_write_on_read_failure(mock_connection, mock_pool):
320+
mock_connection.send_command.return_value = True
321+
mock_connection.read_response.side_effect = [
322+
ConnectionError,
323+
ConnectionError,
324+
b"OK",
325+
]
326+
mock_connection.retry = Retry(ExponentialBackoff(), 3)
327+
mock_connection.retry_on_error = (ConnectionError,)
328+
mock_pool.get_connection.return_value = mock_connection
329+
mock_pool.connection_kwargs = {}
330+
331+
r = Redis(connection_pool=mock_pool, retry=Retry(ExponentialBackoff(), 3))
332+
await r.set("key", "value")
333+
334+
# If read from socket fails, writes won't be executed.
335+
mock_connection.send_command.assert_has_calls(
336+
[
337+
call("SET", "key", "value"),
338+
]
339+
)
340+
341+
342+
@pytest.mark.onlynoncluster
343+
async def test_pipeline_immediate_do_not_retry_write_on_read_failure(
344+
mock_connection, mock_pool
345+
):
346+
mock_connection.send_command.return_value = True
347+
mock_connection.read_response.side_effect = [
348+
ConnectionError,
349+
ConnectionError,
350+
b"OK",
351+
]
352+
mock_connection.retry = Retry(ExponentialBackoff(), 3)
353+
mock_connection.retry_on_error = (ConnectionError,)
354+
mock_pool.get_connection.return_value = mock_connection
355+
mock_pool.connection_kwargs = {}
356+
357+
r = Redis(connection_pool=mock_pool, retry=Retry(ExponentialBackoff(), 3))
358+
pipe = r.pipeline(transaction=False)
359+
await pipe.immediate_execute_command("SET", "key", "value")
360+
361+
# If read from socket fails, writes won't be executed.
362+
mock_connection.send_command.assert_has_calls(
363+
[
364+
call("SET", "key", "value"),
365+
]
366+
)
367+
368+
318369
async def test_close_is_aclose(request):
319370
"""Verify close() calls aclose()"""
320371
calls = 0

tests/test_connection.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
from unittest.mock import call, patch
1111

1212
import pytest
13+
from requests.cookies import MockResponse
14+
1315
import redis
1416
from redis import ConnectionPool, Redis
1517
from redis._parsers import _HiredisParser, _RESP2Parser, _RESP3Parser
16-
from redis.backoff import NoBackoff
18+
from redis.backoff import NoBackoff, ExponentialBackoff
1719
from redis.cache import (
1820
CacheConfig,
1921
CacheEntry,
@@ -249,6 +251,57 @@ def get_redis_connection():
249251
r1.close()
250252

251253

254+
@pytest.mark.onlynoncluster
255+
def test_client_do_not_retry_write_on_read_failure(mock_connection, mock_pool):
256+
mock_connection.send_command.return_value = True
257+
mock_connection.read_response.side_effect = [
258+
ConnectionError,
259+
ConnectionError,
260+
b"OK",
261+
]
262+
mock_connection.retry = Retry(ExponentialBackoff(), 3)
263+
mock_connection.retry_on_error = (ConnectionError,)
264+
mock_pool.get_connection.return_value = mock_connection
265+
mock_pool.connection_kwargs = {}
266+
267+
r = Redis(connection_pool=mock_pool, retry=Retry(ExponentialBackoff(), 3))
268+
r.set("key", "value")
269+
270+
# If read from socket fails, writes won't be executed.
271+
mock_connection.send_command.assert_has_calls(
272+
[
273+
call("SET", "key", "value"),
274+
]
275+
)
276+
277+
278+
@pytest.mark.onlynoncluster
279+
def test_pipeline_immediate_do_not_retry_write_on_read_failure(
280+
mock_connection, mock_pool
281+
):
282+
mock_connection.send_command.return_value = True
283+
mock_connection.read_response.side_effect = [
284+
ConnectionError,
285+
ConnectionError,
286+
b"OK",
287+
]
288+
mock_connection.retry = Retry(ExponentialBackoff(), 3)
289+
mock_connection.retry_on_error = (ConnectionError,)
290+
mock_pool.get_connection.return_value = mock_connection
291+
mock_pool.connection_kwargs = {}
292+
293+
r = Redis(connection_pool=mock_pool, retry=Retry(ExponentialBackoff(), 3))
294+
pipe = r.pipeline(transaction=False)
295+
pipe.immediate_execute_command("SET", "key", "value")
296+
297+
# If read from socket fails, writes won't be executed.
298+
mock_connection.send_command.assert_has_calls(
299+
[
300+
call("SET", "key", "value"),
301+
]
302+
)
303+
304+
252305
@pytest.mark.skipif(sys.version_info == (3, 9), reason="Flacky test on Python 3.9")
253306
@pytest.mark.parametrize("from_url", (True, False), ids=("from_url", "from_args"))
254307
def test_redis_connection_pool(request, from_url):

0 commit comments

Comments
 (0)