Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/celery-library/src/celery_library/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ async def create_task_manager(
client_name="celery_tasks",
)

# GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159
await redis_client_sdk.setup()

return CeleryTaskManager(
app,
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async def redis_client_sdk_lifespan(_: FastAPI, state: State) -> AsyncIterator[S
redis_dsn_with_secrets,
client_name=redis_state.REDIS_CLIENT_NAME,
)
await redis_client.setup()

try:
yield {"REDIS_CLIENT_SDK": redis_client, **called_state}
Expand Down
20 changes: 13 additions & 7 deletions packages/service-library/src/servicelib/redis/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def redis(self) -> aioredis.Redis:
return self._client

def __post_init__(self) -> None:
self._health_check_task_started_event = asyncio.Event()
self._client = aioredis.from_url(
self.redis_dsn,
# Run 3 retries with exponential backoff strategy source: https://redis.readthedocs.io/en/stable/backoff.html
Expand All @@ -61,8 +62,8 @@ def __post_init__(self) -> None:
)
# NOTE: connection is done here already
self._is_healthy = False
self._health_check_task_started_event = asyncio.Event()

async def setup(self) -> None:
@periodic(interval=self.health_check_interval)
async def _periodic_check_health() -> None:
assert self._health_check_task_started_event # nosec
Expand All @@ -74,6 +75,12 @@ async def _periodic_check_health() -> None:
name=f"redis_service_health_check_{self.redis_dsn}__{uuid4()}",
)

# NOTE: this achieves 2 very important things:
# - ensure redis is working
# - before shutting down an initialized Redis connection it must
# make at least one call to the servicer, otherwise tests might hang
await self.ping()

_logger.info(
"Connection to %s succeeded with %s",
f"redis at {self.redis_dsn=}",
Expand All @@ -85,12 +92,10 @@ async def shutdown(self) -> None:
_logger, level=logging.DEBUG, msg=f"Shutdown RedisClientSDK {self}"
):
if self._health_check_task:
assert self._health_check_task_started_event # nosec
# NOTE: wait for the health check task to have started once before we can cancel it
await self._health_check_task_started_event.wait()
await cancel_wait_task(
self._health_check_task, max_delay=_HEALTHCHECK_TASK_TIMEOUT_S
)
with log_catch(_logger, reraise=False):
await cancel_wait_task(
self._health_check_task, max_delay=_HEALTHCHECK_TASK_TIMEOUT_S
)

await self._client.aclose(close_connection_pool=True)

Expand All @@ -99,6 +104,7 @@ async def ping(self) -> bool:
# NOTE: retry_* input parameters from aioredis.from_url do not apply for the ping call
await self._client.ping()
return True

return False

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def setup(self) -> None:
health_check_interval=config.health_check_interval,
client_name=f"{self.client_name}",
)
await self._client_sdks[config.database].setup()

async def shutdown(self) -> None:
await asyncio.gather(
Expand Down
2 changes: 2 additions & 0 deletions packages/service-library/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ async def _(
assert client.redis_dsn == redis_resources_dns
assert client.client_name == "pytest"

await client.setup()

yield client

await client.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def __init__(
)

async def setup(self) -> None:
await self._redis_client.setup()
await self._manager.setup()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async def redis_client_sdk(
decode_responses=False,
client_name="pytest",
)
await sdk.setup()
yield sdk
await sdk.shutdown()

Expand Down
2 changes: 2 additions & 0 deletions packages/service-library/tests/redis/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ async def test_redis_client_sdk_setup_shutdown(

# ensure health check task sets the health to True
client._is_healthy = False # noqa: SLF001

await client.setup()
async for attempt in AsyncRetrying(
wait=wait_fixed(0.1),
stop=stop_after_delay(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async def on_startup() -> None:
app.state.redis_client_sdk = RedisClientSDK(
redis_locks_dsn, client_name=APP_NAME
)
await app.state.redis_client_sdk.setup()

async def on_shutdown() -> None:
redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async def on_startup() -> None:
app.state.redis_client_sdk = RedisClientSDK(
redis_locks_dsn, client_name=APP_NAME
)
await app.state.redis_client_sdk.setup()

async def on_shutdown() -> None:
redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ async def redis_client_sdk(
client = RedisClientSDK(redis_resources_dns, client_name="pytest")
assert client
assert client.redis_dsn == redis_resources_dns
await client.setup()

# cleanup, previous run's leftovers
await client.redis.flushall()

Expand Down Expand Up @@ -328,8 +330,7 @@ async def test_no_redis_key_overlap_when_inheriting(
redis_client_sdk: RedisClientSDK,
component_using_random_text: ComponentUsingRandomText,
):
class ChildRandomTextResourcesManager(RandomTextResourcesManager):
...
class ChildRandomTextResourcesManager(RandomTextResourcesManager): ...

parent_manager = RandomTextResourcesManager(
redis_client_sdk, component_using_random_text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async def on_startup() -> None:
app.state.redis_lock_client_sdk = RedisClientSDK(
redis_locks_dsn, client_name=APP_NAME
)
await app.state.redis_lock_client_sdk.setup()

async def on_shutdown() -> None:
redis_lock_client_sdk: None | RedisClientSDK = app.state.redis_lock_client_sdk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async def on_startup() -> None:
app.state.redis_client_sdk = RedisClientSDK(
redis_locks_dsn, client_name=APP_NAME
)
await app.state.redis_client_sdk.setup()

async def on_shutdown() -> None:
with log_context(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async def on_startup() -> None:
app.state.redis_client_sdk = RedisClientSDK(
redis_locks_dsn, client_name=APP_NAME
)
await app.state.redis_client_sdk.setup()

async def on_shutdown() -> None:
redis_client_sdk = app.state.redis_client_sdk
Expand Down
Loading