Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@ def _load_from_redis_hash(data: dict[str, str]) -> dict[str, Any]:
return {k: json_loads(v) for k, v in data.items()}


def to_redis_namespace(lrt_namespace: LRTNamespace) -> str:
return lrt_namespace.upper()


class RedisStore:
def __init__(self, redis_settings: RedisSettings, namespace: LRTNamespace):
def __init__(self, redis_settings: RedisSettings, lrt_namespace: LRTNamespace):
self.redis_settings = redis_settings
self.namespace: LRTNamespace = namespace.upper()
self.redis_namespace = to_redis_namespace(lrt_namespace)

self._client: RedisClientSDK | None = None

async def setup(self) -> None:
self._client = RedisClientSDK(
self.redis_settings.build_redis_dsn(RedisDatabase.LONG_RUNNING_TASKS),
client_name=f"long_running_tasks_store_{self.namespace}",
client_name=f"long_running_tasks_store_{self.redis_namespace}",
)
await self._client.setup()

Expand All @@ -47,10 +51,10 @@ def _redis(self) -> aioredis.Redis:
return self._client.redis

def _get_redis_key_task_data_match(self) -> str:
return f"{self.namespace}:{_STORE_TYPE_TASK_DATA}*"
return f"{self.redis_namespace}:{_STORE_TYPE_TASK_DATA}*"

def _get_redis_task_data_key(self, task_id: TaskId) -> str:
return f"{self.namespace}:{_STORE_TYPE_TASK_DATA}:{task_id}"
return f"{self.redis_namespace}:{_STORE_TYPE_TASK_DATA}:{task_id}"

async def get_task_data(self, task_id: TaskId) -> TaskData | None:
result: dict[str, Any] = await handle_redis_returns_union_types(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ..logging_utils import log_context
from ..redis._client import RedisClientSDK
from ._redis_store import to_redis_namespace
from .models import LRTNamespace

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -32,15 +33,16 @@ def _redis(self) -> aioredis.Redis:
assert self._client # nosec
return self._client.redis

async def cleanup(self, lrt_namespace: LRTNamespace) -> None:
async def cleanup(self, namespace: LRTNamespace) -> None:
"""removes Redis keys associated to the LRTNamespace if they exist"""
redis_namespace = to_redis_namespace(namespace)
keys_to_remove: list[str] = [
x async for x in self._redis.scan_iter(f"{lrt_namespace}*")
x async for x in self._redis.scan_iter(f"{redis_namespace}*")
]
with log_context(
_logger,
logging.DEBUG,
msg=f"Removing {keys_to_remove=} from Redis for {lrt_namespace=}",
logging.INFO,
msg=f"Removing {keys_to_remove=} from Redis for {redis_namespace=}",
):
if len(keys_to_remove) > 0:
await self._redis.delete(*keys_to_remove)
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def store(
[RedisDatabase], AbstractAsyncContextManager[RedisClientSDK]
],
) -> AsyncIterable[RedisStore]:
store = RedisStore(redis_settings=use_in_memory_redis, namespace="test")
store = RedisStore(redis_settings=use_in_memory_redis, lrt_namespace="test")

await store.setup()
yield store
Expand Down Expand Up @@ -78,7 +78,7 @@ async def redis_stores(
],
) -> AsyncIterable[list[RedisStore]]:
stores: list[RedisStore] = [
RedisStore(redis_settings=use_in_memory_redis, namespace=f"test-{i}")
RedisStore(redis_settings=use_in_memory_redis, lrt_namespace=f"test-{i}")
for i in range(5)
]
for store in stores:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from copy import deepcopy

import pytest
from faker import Faker
from pydantic import TypeAdapter
from servicelib.long_running_tasks._redis_store import RedisStore
from servicelib.long_running_tasks.long_running_client_helper import (
Expand All @@ -23,8 +24,8 @@ def task_data() -> TaskData:


@pytest.fixture
def lrt_namespace() -> LRTNamespace:
return "TEST-NAMESPACE"
def lrt_namespace(faker: Faker) -> LRTNamespace:
return TypeAdapter(LRTNamespace).validate_python(f"test-namespace:{faker.uuid4()}")


@pytest.fixture
Expand All @@ -35,7 +36,7 @@ async def store(
],
lrt_namespace: LRTNamespace,
) -> AsyncIterable[RedisStore]:
store = RedisStore(redis_settings=use_in_memory_redis, namespace=lrt_namespace)
store = RedisStore(redis_settings=use_in_memory_redis, lrt_namespace=lrt_namespace)

await store.setup()
yield store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,13 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes(
await app.state.dynamic_sidecar_scheduler.scheduler.remove_service_from_observation(
scheduler_data.node_uuid
)

await _cleanup_long_running_tasks(app, scheduler_data.node_uuid)

await task_progress.update(
message="finished removing resources", percent=ProgressPercent(1)
)

await _cleanup_long_running_tasks(app, scheduler_data.node_uuid)


async def _cleanup_long_running_tasks(app: FastAPI, node_id: NodeID) -> None:
long_running_client_helper = get_long_running_client_helper(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,8 +1071,10 @@ async def test_wait_for_manual_intervention(
await _ensure_keys_in_store(selected_app, expected_keys=formatted_expected_keys)

# even if cancelled, state of waiting for manual intervention remains the same
with pytest.raises(CannotCancelWhileWaitingForManualInterventionError):
await cancel_operation(selected_app, schedule_id)
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
with attempt: # noqa: SIM117
with pytest.raises(CannotCancelWhileWaitingForManualInterventionError):
await cancel_operation(selected_app, schedule_id)

await _ensure_keys_in_store(selected_app, expected_keys=formatted_expected_keys)

Expand Down
Loading