Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci-arm-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
jobs:
build-and-push-arm64:
if: github.event_name == 'workflow_dispatch'
timeout-minutes: 60 # intentionally long to allow for slow builds
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ repos:
name: upgrade code
exclude: ^scripts/maintenance/computational-clusters/autoscaled_monitor/cli\.py$ # Optional get replaced and typer does not like it
- repo: https://github.com/hadialqattan/pycln
rev: v2.1.4
rev: v2.5.0
hooks:
- id: pycln
args: [--all, --expand-stars]
Expand Down
2 changes: 1 addition & 1 deletion packages/aws-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ pyyaml==6.0.2
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../requirements/constraints.txt
# -r requirements/../../../packages/service-library/requirements/_base.in
redis==5.0.4
redis==5.2.1
# via
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
Expand Down
5 changes: 2 additions & 3 deletions packages/pytest-simcore/src/pytest_simcore/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tenacity
from pytest_mock import MockerFixture
from redis.asyncio import Redis, from_url
from servicelib.redis import _constants as redis_constants
from settings_library.basic_types import PortInt
from settings_library.redis import RedisDatabase, RedisSettings
from tenacity.before_sleep import before_sleep_log
Expand Down Expand Up @@ -118,6 +119,4 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
@pytest.fixture
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
# lowered to allow CI to properly shutdown RedisClientSDK instances
from servicelib import redis

mocker.patch.object(redis, "_DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
2 changes: 1 addition & 1 deletion packages/service-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ pyyaml==6.0.2
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../requirements/constraints.txt
# -r requirements/_base.in
redis==5.0.4
redis==5.2.1
# via
# -c requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../packages/models-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
Expand Down
25 changes: 25 additions & 0 deletions packages/service-library/src/servicelib/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from ._client import RedisClientSDK
from ._clients_manager import RedisClientsManager
from ._decorators import exclusive
from ._distributed_locks_utils import start_exclusive_periodic_task
from ._errors import (
CouldNotAcquireLockError,
CouldNotConnectToRedisError,
LockLostError,
)
from ._models import RedisManagerDBConfig
from ._utils import handle_redis_returns_union_types

__all__: tuple[str, ...] = (
"CouldNotAcquireLockError",
"CouldNotConnectToRedisError",
"exclusive",
"handle_redis_returns_union_types",
"LockLostError",
"RedisClientSDK",
"RedisClientsManager",
"RedisManagerDBConfig",
"start_exclusive_periodic_task",
)

# nopycln: file
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,38 @@
from asyncio import Task
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from typing import Final
from uuid import uuid4

import redis.asyncio as aioredis
import redis.exceptions
from common_library.errors_classes import OsparcErrorMixin
from pydantic import NonNegativeFloat, NonNegativeInt
from pydantic import NonNegativeFloat
from redis.asyncio.lock import Lock
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialBackoff
from settings_library.redis import RedisDatabase, RedisSettings
from tenacity import retry
from yarl import URL

from .background_task import periodic_task
from .logging_utils import log_catch, log_context
from .retry_policies import RedisRetryPolicyUponInitialization

_DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
_DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)


_DEFAULT_DECODE_RESPONSES: Final[bool] = True
_DEFAULT_HEALTH_CHECK_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(
seconds=5
from ..background_task import periodic_task
from ..logging_utils import log_catch
from ..retry_policies import RedisRetryPolicyUponInitialization
from ._constants import (
DEFAULT_DECODE_RESPONSES,
DEFAULT_HEALTH_CHECK_INTERVAL,
DEFAULT_LOCK_TTL,
DEFAULT_SOCKET_TIMEOUT,
)
_SHUTDOWN_TIMEOUT_S: Final[NonNegativeInt] = 5

from ._errors import CouldNotAcquireLockError, CouldNotConnectToRedisError
from ._utils import auto_extend_lock, cancel_or_warn

_logger = logging.getLogger(__name__)


class BaseRedisError(OsparcErrorMixin, RuntimeError):
...


class CouldNotAcquireLockError(BaseRedisError):
msg_template: str = "Lock {lock.name} could not be acquired!"


class CouldNotConnectToRedisError(BaseRedisError):
msg_template: str = "Connection to '{dsn}' failed"


async def _cancel_or_warn(task: Task) -> None:
if not task.cancelled():
task.cancel()
_, pending = await asyncio.wait((task,), timeout=_SHUTDOWN_TIMEOUT_S)
if pending:
task_name = task.get_name()
_logger.warning("Could not cancel task_name=%s pending=%s", task_name, pending)


@dataclass
class RedisClientSDK:
redis_dsn: str
client_name: str
decode_responses: bool = _DEFAULT_DECODE_RESPONSES
health_check_interval: datetime.timedelta = _DEFAULT_HEALTH_CHECK_INTERVAL
decode_responses: bool = DEFAULT_DECODE_RESPONSES
health_check_interval: datetime.timedelta = DEFAULT_HEALTH_CHECK_INTERVAL

_client: aioredis.Redis = field(init=False)
_health_check_task: Task | None = None
Expand All @@ -74,7 +47,7 @@ class RedisClientSDK:
def redis(self) -> aioredis.Redis:
return self._client

def __post_init__(self):
def __post_init__(self) -> None:
self._client = aioredis.from_url(
self.redis_dsn,
# Run 3 retries with exponential backoff strategy source: https://redis.readthedocs.io/en/stable/backoff.html
Expand All @@ -84,8 +57,8 @@ def __post_init__(self):
redis.exceptions.ConnectionError,
redis.exceptions.TimeoutError,
],
socket_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_connect_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_connect_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(),
encoding="utf-8",
decode_responses=self.decode_responses,
client_name=self.client_name,
Expand Down Expand Up @@ -113,7 +86,7 @@ async def setup(self) -> None:
async def shutdown(self) -> None:
if self._health_check_task:
self._continue_health_checking = False
await _cancel_or_warn(self._health_check_task)
await cancel_or_warn(self._health_check_task)
self._health_check_task = None

await self._client.aclose(close_connection_pool=True)
Expand Down Expand Up @@ -165,7 +138,7 @@ async def lock_context(
2. `blocking==True` timeouts out while waiting for lock to be free (another entity holds the lock)
"""

total_lock_duration: datetime.timedelta = _DEFAULT_LOCK_TTL
total_lock_duration: datetime.timedelta = DEFAULT_LOCK_TTL
lock_unique_id = f"lock_extender_{lock_key}_{uuid4()}"

ttl_lock: Lock = self._client.lock(
Expand All @@ -178,15 +151,9 @@ async def lock_context(
if not await ttl_lock.acquire(token=lock_value):
raise CouldNotAcquireLockError(lock=ttl_lock)

async def _extend_lock(lock: Lock) -> None:
with log_context(
_logger, logging.DEBUG, f"Extending lock {lock_unique_id}"
), log_catch(_logger, reraise=False):
await lock.reacquire()

try:
async with periodic_task(
_extend_lock,
auto_extend_lock,
interval=total_lock_duration / 2,
task_name=lock_unique_id,
lock=ttl_lock,
Expand Down Expand Up @@ -224,51 +191,3 @@ async def _extend_lock(lock: Lock) -> None:
async def lock_value(self, lock_name: str) -> str | None:
output: str | None = await self._client.get(lock_name)
return output


@dataclass(frozen=True)
class RedisManagerDBConfig:
database: RedisDatabase
decode_responses: bool = _DEFAULT_DECODE_RESPONSES
health_check_interval: datetime.timedelta = _DEFAULT_HEALTH_CHECK_INTERVAL


@dataclass
class RedisClientsManager:
"""
Manages the lifetime of redis client sdk connections
"""

databases_configs: set[RedisManagerDBConfig]
settings: RedisSettings
client_name: str

_client_sdks: dict[RedisDatabase, RedisClientSDK] = field(default_factory=dict)

async def setup(self) -> None:
for config in self.databases_configs:
self._client_sdks[config.database] = RedisClientSDK(
redis_dsn=self.settings.build_redis_dsn(config.database),
decode_responses=config.decode_responses,
health_check_interval=config.health_check_interval,
client_name=f"{self.client_name}",
)

for client in self._client_sdks.values():
await client.setup()

async def shutdown(self) -> None:
# NOTE: somehow using logged_gather is not an option
# doing so will make the shutdown procedure hang
for client in self._client_sdks.values():
await client.shutdown()

def client(self, database: RedisDatabase) -> RedisClientSDK:
return self._client_sdks[database]

async def __aenter__(self) -> "RedisClientsManager":
await self.setup()
return self

async def __aexit__(self, *args):
await self.shutdown()
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from dataclasses import dataclass, field

from settings_library.redis import RedisDatabase, RedisSettings

from ._client import RedisClientSDK
from ._models import RedisManagerDBConfig


@dataclass
class RedisClientsManager:
"""
Manages the lifetime of redis client sdk connections
"""

databases_configs: set[RedisManagerDBConfig]
settings: RedisSettings
client_name: str

_client_sdks: dict[RedisDatabase, RedisClientSDK] = field(default_factory=dict)

async def setup(self) -> None:
for config in self.databases_configs:
self._client_sdks[config.database] = RedisClientSDK(
redis_dsn=self.settings.build_redis_dsn(config.database),
decode_responses=config.decode_responses,
health_check_interval=config.health_check_interval,
client_name=f"{self.client_name}",
)

for client in self._client_sdks.values():
await client.setup()

async def shutdown(self) -> None:
# NOTE: somehow using logged_gather is not an option
# doing so will make the shutdown procedure hang
for client in self._client_sdks.values():
await client.shutdown()

def client(self, database: RedisDatabase) -> RedisClientSDK:
return self._client_sdks[database]

async def __aenter__(self) -> "RedisClientsManager":
await self.setup()
return self

async def __aexit__(self, *args) -> None:
await self.shutdown()
12 changes: 12 additions & 0 deletions packages/service-library/src/servicelib/redis/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import datetime
from typing import Final

from pydantic import NonNegativeInt

DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)


DEFAULT_DECODE_RESPONSES: Final[bool] = True
DEFAULT_HEALTH_CHECK_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(seconds=5)
SHUTDOWN_TIMEOUT_S: Final[NonNegativeInt] = 5
56 changes: 56 additions & 0 deletions packages/service-library/src/servicelib/redis/_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import functools
import logging
from collections.abc import Awaitable, Callable
from typing import ParamSpec, TypeVar

from ._client import RedisClientSDK

_logger = logging.getLogger(__file__)

P = ParamSpec("P")
R = TypeVar("R")


def exclusive(
redis: RedisClientSDK | Callable[..., RedisClientSDK],
*,
lock_key: str | Callable[..., str],
lock_value: bytes | str | None = None,
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
"""
Define a method to run exclusively across
processes by leveraging a Redis Lock.

parameters:
redis: the redis client SDK
lock_key: a string as the name of the lock (good practice: app_name:lock_name)
lock_value: some additional data that can be retrieved by another client

Raises:
- ValueError if used incorrectly
- CouldNotAcquireLockError if the lock could not be acquired
"""

if not lock_key:
msg = "lock_key cannot be empty string!"
raise ValueError(msg)

def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
redis_lock_key = (
lock_key(*args, **kwargs) if callable(lock_key) else lock_key
)
assert isinstance(redis_lock_key, str) # nosec

redis_client = redis(*args, **kwargs) if callable(redis) else redis
assert isinstance(redis_client, RedisClientSDK) # nosec

async with redis_client.lock_context(
lock_key=redis_lock_key, lock_value=lock_value
):
return await func(*args, **kwargs)

return wrapper

return decorator
Loading
Loading