Skip to content

Commit b253989

Browse files
committed
updated services
1 parent a60bfc3 commit b253989

File tree

5 files changed

+33
-35
lines changed

5 files changed

+33
-35
lines changed

services/api-server/src/simcore_service_api_server/services/log_streaming.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
from pydantic import NonNegativeInt
1111
from servicelib.logging_errors import create_troubleshotting_log_kwargs
1212
from servicelib.logging_utils import log_catch
13-
from servicelib.rabbitmq import RabbitMQClient
14-
from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError
15-
from simcore_service_api_server.models.schemas.errors import ErrorGet
13+
from servicelib.rabbitmq import QueueName, RabbitMQClient
1614

1715
from .._constants import MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE
16+
from ..exceptions.backend_errors import BaseBackEndError
1817
from ..exceptions.log_streaming_errors import (
1918
LogStreamerNotRegisteredError,
2019
LogStreamerRegistionConflictError,
2120
)
21+
from ..models.schemas.errors import ErrorGet
2222
from ..models.schemas.jobs import JobID, JobLog
2323
from .director_v2 import DirectorV2Api
2424

@@ -31,10 +31,10 @@ class LogDistributor:
3131
def __init__(self, rabbitmq_client: RabbitMQClient):
3232
self._rabbit_client = rabbitmq_client
3333
self._log_streamers: dict[JobID, Queue[JobLog]] = {}
34-
self._queue_name: str
34+
self._queue_name: QueueName
3535

3636
async def setup(self):
37-
self._queue_name = await self._rabbit_client.subscribe(
37+
self._queue_name, _ = await self._rabbit_client.subscribe(
3838
LoggerRabbitMessage.get_channel_name(),
3939
self._distribute_logs,
4040
exclusive_queue=True,
@@ -123,7 +123,7 @@ async def log_generator(self) -> AsyncIterable[str]:
123123
self._queue.get(), timeout=self._log_check_timeout
124124
)
125125
yield log.model_dump_json() + _NEW_LINE
126-
except asyncio.TimeoutError:
126+
except TimeoutError:
127127
done = await self._project_done()
128128

129129
except BaseBackEndError as exc:

services/payments/src/simcore_service_payments/services/auto_recharge_listener.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,44 +4,44 @@
44
from fastapi import FastAPI
55
from models_library.rabbitmq_messages import WalletCreditsMessage
66
from servicelib.logging_utils import log_context
7-
from servicelib.rabbitmq import RabbitMQClient
7+
from servicelib.rabbitmq import ConsumerTag, QueueName
88

99
from .auto_recharge_process_message import process_message
1010
from .rabbitmq import get_rabbitmq_client
1111

1212
_logger = logging.getLogger(__name__)
1313

1414

15-
async def _subscribe_to_rabbitmq(app) -> str:
15+
async def _subscribe_to_rabbitmq(app) -> tuple[QueueName, ConsumerTag]:
1616
with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"):
17-
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
18-
subscribed_queue: str = await rabbit_client.subscribe(
17+
rabbit_client = get_rabbitmq_client(app)
18+
return await rabbit_client.subscribe(
1919
WalletCreditsMessage.get_channel_name(),
2020
message_handler=functools.partial(process_message, app),
2121
exclusive_queue=False,
2222
topics=["#"],
2323
)
24-
return subscribed_queue
2524

2625

27-
async def _unsubscribe_consumer(app) -> None:
26+
async def _unsubscribe_consumer(
27+
app, queue_name: QueueName, consumer_tag: ConsumerTag
28+
) -> None:
2829
with log_context(_logger, logging.INFO, msg="Unsubscribing from rabbitmq queue"):
29-
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
30-
await rabbit_client.unsubscribe_consumer(
31-
WalletCreditsMessage.get_channel_name(),
32-
)
30+
rabbit_client = get_rabbitmq_client(app)
31+
await rabbit_client.unsubscribe_consumer(queue_name, consumer_tag)
3332

3433

3534
def setup_auto_recharge_listener(app: FastAPI):
36-
async def _on_startup():
35+
async def _on_startup() -> None:
3736
app.state.auto_recharge_rabbitmq_consumer = await _subscribe_to_rabbitmq(app)
3837

39-
async def _on_shutdown():
38+
async def _on_shutdown() -> None:
4039
assert app.state.auto_recharge_rabbitmq_consumer # nosec
40+
assert isinstance(app.state.auto_recharge_rabbitmq_consumer, tuple) # nosec
4141
if app.state.rabbitmq_client:
4242
# NOTE: We want to have persistent queue, therefore we will unsubscribe only consumer
43-
await _unsubscribe_consumer(app)
44-
app.state.auto_recharge_rabbitmq_constumer = None
43+
await _unsubscribe_consumer(app, *app.state.auto_recharge_rabbitmq_consumer)
44+
app.state.auto_recharge_rabbitmq_consumer = None
4545

4646
app.add_event_handler("startup", _on_startup)
4747
app.add_event_handler("shutdown", _on_shutdown)

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers_common.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from aiohttp import web
77
from servicelib.logging_utils import log_context
8-
from servicelib.rabbitmq import RabbitMQClient
8+
from servicelib.rabbitmq import ConsumerTag, ExchangeName, QueueName, RabbitMQClient
99
from servicelib.utils import logged_gather
1010

1111
from ..rabbitmq import get_rabbitmq_client
@@ -25,10 +25,10 @@ async def subscribe_to_rabbitmq(
2525
SubcribeArgumentsTuple,
2626
...,
2727
],
28-
) -> dict[str, str]:
28+
) -> dict[ExchangeName, tuple[QueueName, ConsumerTag]]:
2929
with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channels"):
3030
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
31-
subscribed_queues = await logged_gather(
31+
subscribed_queue_consumer_mappings = await logged_gather(
3232
*(
3333
rabbit_client.subscribe(
3434
p.exchange_name,
@@ -40,8 +40,8 @@ async def subscribe_to_rabbitmq(
4040
reraise=True,
4141
)
4242
return {
43-
exchange_name: queue_name
44-
for (exchange_name, *_), queue_name in zip(
45-
exchange_to_parser_config, subscribed_queues, strict=True
43+
exchange_name: queue_consumer_map
44+
for (exchange_name, *_), queue_consumer_map in zip(
45+
exchange_to_parser_config, subscribed_queue_consumer_mappings, strict=True
4646
)
4747
}

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,10 @@ async def _convert_to_node_update_event(
6565

6666

6767
async def _progress_message_parser(app: web.Application, data: bytes) -> bool:
68-
rabbit_message: (
69-
ProgressRabbitMessageNode | ProgressRabbitMessageProject
70-
) = TypeAdapter(
71-
ProgressRabbitMessageNode | ProgressRabbitMessageProject
72-
).validate_json(
73-
data
68+
rabbit_message: ProgressRabbitMessageNode | ProgressRabbitMessageProject = (
69+
TypeAdapter(
70+
ProgressRabbitMessageNode | ProgressRabbitMessageProject
71+
).validate_json(data)
7472
)
7573
message: SocketMessageDict | None = None
7674
if isinstance(rabbit_message, ProgressRabbitMessageProject):
@@ -183,7 +181,7 @@ async def _unsubscribe_from_rabbitmq(app) -> None:
183181
await logged_gather(
184182
*(
185183
rabbit_client.unsubscribe(queue_name)
186-
for queue_name in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
184+
for queue_name, _ in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
187185
),
188186
)
189187

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_nonexclusive_queue_consumers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ async def _unsubscribe_from_rabbitmq(app) -> None:
5959
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
6060
await logged_gather(
6161
*(
62-
rabbit_client.unsubscribe_consumer(queue_name)
63-
for queue_name in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
62+
rabbit_client.unsubscribe_consumer(*queue_consumer_map)
63+
for queue_consumer_map in app[_APP_RABBITMQ_CONSUMERS_KEY].values()
6464
),
6565
)
6666

0 commit comments

Comments
 (0)