Skip to content

Commit f50299e

Browse files
committed
Fixed async teardown
1 parent ca8166c commit f50299e

File tree

5 files changed

+39
-28
lines changed

5 files changed

+39
-28
lines changed

redis/asyncio/multidb/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ def _on_circuit_state_change_callback(self, circuit: CircuitBreaker, old_state:
305305
if old_state == CBState.CLOSED and new_state == CBState.OPEN:
306306
loop.call_later(DEFAULT_GRACE_PERIOD, _half_open_circuit, circuit)
307307

308+
async def aclose(self):
309+
await self.command_executor.active_database.client.aclose()
310+
308311
def _half_open_circuit(circuit: CircuitBreaker):
309312
circuit.state = CBState.HALF_OPEN
310313

redis/multidb/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,9 @@ def _on_circuit_state_change_callback(self, circuit: CircuitBreaker, old_state:
271271
if old_state == CBState.CLOSED and new_state == CBState.OPEN:
272272
self._bg_scheduler.run_once(DEFAULT_GRACE_PERIOD, _half_open_circuit, circuit)
273273

274+
def close(self):
275+
self.command_executor.active_database.client.close()
276+
274277
def _half_open_circuit(circuit: CircuitBreaker):
275278
circuit.state = CBState.HALF_OPEN
276279

tests/test_asyncio/test_scenario/conftest.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import asyncio
12
import os
3+
from typing import Any, AsyncGenerator
24

35
import pytest
46
import pytest_asyncio
57

68
from redis.asyncio import Redis
9+
from redis.asyncio.multidb.client import MultiDBClient
710
from redis.asyncio.multidb.config import DEFAULT_FAILURES_THRESHOLD, DEFAULT_HEALTH_CHECK_INTERVAL, DatabaseConfig, \
811
MultiDbConfig
912
from redis.asyncio.multidb.event import AsyncActiveDatabaseChanged
@@ -27,7 +30,7 @@ def fault_injector_client():
2730
return FaultInjectorClient(url)
2831

2932
@pytest_asyncio.fixture()
30-
async def r_multi_db(request) -> tuple[MultiDbConfig, CheckActiveDatabaseChangedListener, dict]:
33+
async def r_multi_db(request) -> AsyncGenerator[tuple[MultiDBClient, CheckActiveDatabaseChangedListener, Any], Any]:
3134
client_class = request.param.get('client_class', Redis)
3235

3336
if client_class == Redis:
@@ -38,7 +41,7 @@ async def r_multi_db(request) -> tuple[MultiDbConfig, CheckActiveDatabaseChanged
3841
username = endpoint_config.get('username', None)
3942
password = endpoint_config.get('password', None)
4043
failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD)
41-
command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=1, base=0.05), retries=10))
44+
command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=0.1, base=0.01), retries=10))
4245

4346
# Retry configuration different for health checks as initial health check require more time in case
4447
# if infrastructure wasn't restored from the previous test.
@@ -86,4 +89,11 @@ async def r_multi_db(request) -> tuple[MultiDbConfig, CheckActiveDatabaseChanged
8689
event_dispatcher=event_dispatcher,
8790
)
8891

89-
return config, listener, endpoint_config
92+
client = MultiDBClient(config)
93+
94+
async def teardown():
95+
await client.aclose()
96+
await asyncio.sleep(15)
97+
98+
yield client, listener, endpoint_config
99+
await teardown()

tests/test_asyncio/test_scenario/test_active_active.py

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,6 @@ async def trigger_network_failure_action(fault_injector_client, config, event: a
3939
logger.info(f"Action completed. Status: {status_result['status']}")
4040

4141
class TestActiveActive:
42-
43-
def teardown_method(self, method):
44-
# Timeout so the cluster could recover from network failure.
45-
sleep(15)
46-
4742
@pytest.mark.asyncio
4843
@pytest.mark.parametrize(
4944
"r_multi_db",
@@ -54,9 +49,9 @@ def teardown_method(self, method):
5449
ids=["standalone", "cluster"],
5550
indirect=True
5651
)
57-
@pytest.mark.timeout(100)
52+
@pytest.mark.timeout(200)
5853
async def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector_client):
59-
client_config, listener, endpoint_config = r_multi_db
54+
client, listener, endpoint_config = r_multi_db
6055

6156
# Handle unavailable databases from previous test.
6257
retry = Retry(
@@ -65,7 +60,7 @@ async def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_in
6560
backoff=ConstantBackoff(backoff=DEFAULT_FAILOVER_DELAY)
6661
)
6762

68-
async with MultiDBClient(client_config) as r_multi_db:
63+
async with client as r_multi_db:
6964
event = asyncio.Event()
7065
asyncio.create_task(trigger_network_failure_action(fault_injector_client, endpoint_config, event))
7166

@@ -116,16 +111,16 @@ async def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_in
116111
ids=["standalone", "cluster"],
117112
indirect=True
118113
)
119-
@pytest.mark.timeout(100)
114+
@pytest.mark.timeout(200)
120115
async def test_multi_db_client_uses_lag_aware_health_check(self, r_multi_db, fault_injector_client):
121-
client_config, listener, endpoint_config = r_multi_db
116+
client, listener, endpoint_config = r_multi_db
122117
retry = Retry(
123118
supported_errors=(TemporaryUnavailableException,),
124119
retries=DEFAULT_FAILOVER_ATTEMPTS,
125120
backoff=ConstantBackoff(backoff=DEFAULT_FAILOVER_DELAY)
126121
)
127122

128-
async with MultiDBClient(client_config) as r_multi_db:
123+
async with client as r_multi_db:
129124
event = asyncio.Event()
130125
asyncio.create_task(trigger_network_failure_action(fault_injector_client, endpoint_config, event))
131126

@@ -160,9 +155,9 @@ async def test_multi_db_client_uses_lag_aware_health_check(self, r_multi_db, fau
160155
ids=["standalone", "cluster"],
161156
indirect=True
162157
)
163-
@pytest.mark.timeout(100)
158+
@pytest.mark.timeout(200)
164159
async def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault_injector_client):
165-
client_config, listener, endpoint_config = r_multi_db
160+
client, listener, endpoint_config = r_multi_db
166161
retry = Retry(
167162
supported_errors=(TemporaryUnavailableException,),
168163
retries=DEFAULT_FAILOVER_ATTEMPTS,
@@ -179,7 +174,7 @@ async def callback():
179174
pipe.get('{hash}key3')
180175
assert await pipe.execute() == [True, True, True, 'value1', 'value2', 'value3']
181176

182-
async with MultiDBClient(client_config) as r_multi_db:
177+
async with client as r_multi_db:
183178
event = asyncio.Event()
184179
asyncio.create_task(trigger_network_failure_action(fault_injector_client, endpoint_config, event))
185180

@@ -209,9 +204,9 @@ async def callback():
209204
ids=["standalone", "cluster"],
210205
indirect=True
211206
)
212-
@pytest.mark.timeout(100)
207+
@pytest.mark.timeout(200)
213208
async def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_injector_client):
214-
client_config, listener, endpoint_config = r_multi_db
209+
client, listener, endpoint_config = r_multi_db
215210
retry = Retry(
216211
supported_errors=(TemporaryUnavailableException,),
217212
retries=DEFAULT_FAILOVER_ATTEMPTS,
@@ -228,7 +223,7 @@ async def callback():
228223
pipe.get('{hash}key3')
229224
assert await pipe.execute() == [True, True, True, 'value1', 'value2', 'value3']
230225

231-
async with MultiDBClient(client_config) as r_multi_db:
226+
async with client as r_multi_db:
232227
event = asyncio.Event()
233228
asyncio.create_task(trigger_network_failure_action(fault_injector_client, endpoint_config, event))
234229

@@ -258,9 +253,9 @@ async def callback():
258253
ids=["standalone", "cluster"],
259254
indirect=True
260255
)
261-
@pytest.mark.timeout(100)
256+
@pytest.mark.timeout(200)
262257
async def test_transaction_failover_to_another_db(self, r_multi_db, fault_injector_client):
263-
client_config, listener, endpoint_config = r_multi_db
258+
client, listener, endpoint_config = r_multi_db
264259

265260
retry = Retry(
266261
supported_errors=(TemporaryUnavailableException,),
@@ -276,7 +271,7 @@ async def callback(pipe: Pipeline):
276271
pipe.get('{hash}key2')
277272
pipe.get('{hash}key3')
278273

279-
async with MultiDBClient(client_config) as r_multi_db:
274+
async with client as r_multi_db:
280275
event = asyncio.Event()
281276
asyncio.create_task(trigger_network_failure_action(fault_injector_client, endpoint_config, event))
282277

@@ -302,9 +297,9 @@ async def callback(pipe: Pipeline):
302297
[{"failure_threshold": 2}],
303298
indirect=True
304299
)
305-
@pytest.mark.timeout(60)
300+
@pytest.mark.timeout(200)
306301
async def test_pubsub_failover_to_another_db(self, r_multi_db, fault_injector_client):
307-
client_config, listener, endpoint_config = r_multi_db
302+
client, listener, endpoint_config = r_multi_db
308303
retry = Retry(
309304
supported_errors=(TemporaryUnavailableException,),
310305
retries=DEFAULT_FAILOVER_ATTEMPTS,
@@ -318,7 +313,7 @@ async def handler(message):
318313
nonlocal messages_count
319314
messages_count += 1
320315

321-
async with MultiDBClient(client_config) as r_multi_db:
316+
async with client as r_multi_db:
322317
event = asyncio.Event()
323318
asyncio.create_task(trigger_network_failure_action(fault_injector_client, endpoint_config, event))
324319

@@ -358,4 +353,4 @@ async def handler(message):
358353
await asyncio.sleep(0.1)
359354
task.cancel()
360355
await pubsub.unsubscribe('test-channel') is True
361-
assert messages_count >= 5
356+
assert messages_count >= 2

tests/test_scenario/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListen
7575
username = endpoint_config.get('username', None)
7676
password = endpoint_config.get('password', None)
7777
failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD)
78-
command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=1, base=0.05), retries=10))
78+
command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=0.1, base=0.01), retries=10))
7979

8080
# Retry configuration different for health checks as initial health check require more time in case
8181
# if infrastructure wasn't restored from the previous test.

0 commit comments

Comments
 (0)