Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion packages/service-library/src/servicelib/rabbitmq/_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging
import random
import socket
import string
from pathlib import Path
from tempfile import gettempdir
from typing import Any, Final

import aio_pika
Expand All @@ -19,6 +23,12 @@

RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS: Final[int] = 15 * _MINUTE * 1000

_PATH_UNIQUE_RABBIT_QUEUE_PREFIX: Final[Path] = (
Path(gettempdir()) / f"{__name__}_unique_rabbit_queue_prefix"
)
_ALPHABET: Final[str] = string.ascii_letters + string.digits
_CHAR_COUNT: Final[NonNegativeInt] = 6


class RabbitMQRetryPolicyUponInitialization:
"""Retry policy upon service initialization"""
Expand Down Expand Up @@ -50,8 +60,26 @@ async def wait_till_rabbitmq_responsive(url: str) -> bool:
return await is_rabbitmq_responsive(url)


def _get_unique_rabbit_queue_name_prefix() -> str:
# NOTE: this prefix is guaranteed to be unique for the entire lifecycle of the docker container
# Why is this desiarable?
# 1. the code base makes the above assumption, otherwise subcscribers and consumers do not work
# 2. enables restaratability of production deployemnts, where containers are frist created and
# then killed; avoids overlapping queue names and errors during start
prefix: str | None = None
if _PATH_UNIQUE_RABBIT_QUEUE_PREFIX.exists():
prefix = _PATH_UNIQUE_RABBIT_QUEUE_PREFIX.read_text()

if prefix is None:
random_str = "".join(random.choices(_ALPHABET, k=_CHAR_COUNT)) # noqa: S311
prefix = f"{socket.gethostname()}_{random_str}"
_PATH_UNIQUE_RABBIT_QUEUE_PREFIX.write_text(prefix)

return prefix


def get_rabbitmq_client_unique_name(base_name: str) -> str:
return f"{base_name}_{socket.gethostname()}"
return f"{base_name}_{_get_unique_rabbit_queue_name_prefix()}"


async def declare_queue(
Expand Down