Skip to content

Commit 7baad68

Browse files
author
Andrei Neagu
committed
fixed redis client sdk being stuck
1 parent 68a6671 commit 7baad68

File tree

19 files changed

+31
-16
lines changed

19 files changed

+31
-16
lines changed

packages/celery-library/src/celery_library/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ async def create_task_manager(
4747
),
4848
client_name="celery_tasks",
4949
)
50+
await redis_client_sdk.setup()
51+
#
5052

5153
return CeleryTaskManager(
5254
app,

packages/service-library/src/servicelib/fastapi/redis_lifespan.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ async def redis_client_sdk_lifespan(_: FastAPI, state: State) -> AsyncIterator[S
5151
redis_dsn_with_secrets,
5252
client_name=redis_state.REDIS_CLIENT_NAME,
5353
)
54+
await redis_client.setup()
5455

5556
try:
5657
yield {"REDIS_CLIENT_SDK": redis_client, **called_state}

packages/service-library/src/servicelib/long_running_tasks/_store/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,5 @@ async def setup(self) -> None:
3636
"""Setup the store, if needed."""
3737

3838
@abstractmethod
39-
async def teardown(self) -> None:
40-
"""Teardown the store, if needed."""
39+
async def shutdown(self) -> None:
40+
"""Shutdown the store, if needed."""

packages/service-library/src/servicelib/long_running_tasks/_store/in_memory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def __init__(self, *args, **kwargs):
1212
async def setup(self) -> None:
1313
pass
1414

15-
async def teardown(self) -> None:
15+
async def shutdown(self) -> None:
1616
pass
1717

1818
async def get_task_data(self, task_id: TaskId) -> TaskData | None:

packages/service-library/src/servicelib/long_running_tasks/_store/redis.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ async def setup(self) -> None:
2525
self.redis_settings.build_redis_dsn(RedisDatabase.LONG_RUNNING_TASKS),
2626
client_name=f"long_running_tasks_store_{self.namespace}",
2727
)
28+
await self._client.setup()
2829

29-
async def teardown(self) -> None:
30+
async def shutdown(self) -> None:
3031
if self._client:
3132
await self._client.shutdown()
3233

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ async def setup(self) -> None:
132132
self.redis_settings.build_redis_dsn(RedisDatabase.LOCKS),
133133
client_name=f"long_running_tasks_store_{self.namespace}_lock",
134134
)
135+
await self.redis_client_sdk.setup()
135136

136137
self._stale_tasks_monitor_task = create_periodic_task(
137138
task=exclusive(
@@ -148,7 +149,6 @@ async def setup(self) -> None:
148149
)
149150

150151
async def teardown(self) -> None:
151-
152152
for tracked_task in await self._tasks_data.list_tasks_data():
153153
# when closing we do not care about pending errors
154154
await self.remove_task(
@@ -167,11 +167,11 @@ async def teardown(self) -> None:
167167
self._cancelled_tasks_removal_task, max_delay=_CANCEL_TASK_TIMEOUT
168168
)
169169

170-
await self._tasks_data.teardown()
171-
172-
if self.redis_client_sdk:
170+
if self.redis_client_sdk is not None:
173171
await self.redis_client_sdk.shutdown()
174172

173+
await self._tasks_data.shutdown()
174+
175175
async def _stale_tasks_monitor_worker(self) -> None:
176176
"""
177177
A task is considered stale, if the task status is not queried

packages/service-library/src/servicelib/redis/_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def __post_init__(self) -> None:
6363
self._is_healthy = False
6464
self._health_check_task_started_event = asyncio.Event()
6565

66+
async def setup(self) -> None:
6667
@periodic(interval=self.health_check_interval)
6768
async def _periodic_check_health() -> None:
6869
assert self._health_check_task_started_event # nosec
@@ -74,6 +75,9 @@ async def _periodic_check_health() -> None:
7475
name=f"redis_service_health_check_{self.redis_dsn}__{uuid4()}",
7576
)
7677

78+
assert self._health_check_task_started_event # nosec
79+
await self._health_check_task_started_event.wait()
80+
7781
_logger.info(
7882
"Connection to %s succeeded with %s",
7983
f"redis at {self.redis_dsn=}",

packages/service-library/src/servicelib/redis/_clients_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ async def setup(self) -> None:
2727
health_check_interval=config.health_check_interval,
2828
client_name=f"{self.client_name}",
2929
)
30+
await self._client_sdks[config.database].setup()
3031

3132
async def shutdown(self) -> None:
3233
await asyncio.gather(

packages/service-library/tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ async def _(
8686
client = RedisClientSDK(
8787
redis_resources_dns, decode_responses=decode_response, client_name="pytest"
8888
)
89+
await client.setup()
8990
assert client
9091
assert client.redis_dsn == redis_resources_dns
9192
assert client.client_name == "pytest"

packages/service-library/tests/deferred_tasks/example_app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def __init__(
9595
)
9696

9797
async def setup(self) -> None:
98+
await self._redis_client.setup()
9899
await self._manager.setup()
99100

100101

0 commit comments

Comments
 (0)