Skip to content

Commit e7018c7

Browse files
authored
♻️Rabbitmq rpc: response shall not be jsonized (#4730)
1 parent 767354a commit e7018c7

File tree

8 files changed

+41
-25
lines changed

8 files changed

+41
-25
lines changed

.coveragerc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,8 @@ exclude_lines =
2323
if 0:
2424
if __name__ == .__main__.:
2525

26+
# Don't complain about abstract methods, they aren't run:
27+
@(abc\.)?abstractmethod
28+
2629
ignore_errors = True
2730
show_missing = True

.ruff.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ target-version = "py310"
5151

5252

5353
[per-file-ignores]
54-
"{**/{tests, pytest_simcore}/**}" = [
54+
"**/{tests,pytest_simcore}/**" = [
5555
"T201", # print found
5656
"ARG001", # unused function argument
5757
"PT019", # user pytest.mark.usefixture

packages/service-library/src/servicelib/rabbitmq/_errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,9 @@ class RemoteMethodNotRegisteredError(BaseRPCError):
2020
"Could not find a remote method named: '{method_name}'. "
2121
"Message from remote server was returned: {incoming_message}. "
2222
)
23+
24+
25+
class RPCServerError(BaseRPCError):
26+
msg_template = (
27+
"Unhandled error while running method '{exc_type}:{method_name}': '{msg}'"
28+
)

packages/service-library/src/servicelib/rabbitmq/_rpc_router.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
import asyncio
12
import functools
23
import logging
34
from collections.abc import Callable
45
from dataclasses import dataclass, field
56
from typing import Any, TypeVar
67

7-
import orjson
8-
from models_library.utils.fastapi_encoders import jsonable_encoder
98
from pydantic import SecretStr
109

11-
from ..logging_utils import log_catch, log_context
10+
from ..logging_utils import log_context
11+
from ._errors import RPCServerError
1212
from ._models import RPCMethodName
1313

1414
DecoratedCallable = TypeVar("DecoratedCallable", bound=Callable[..., Any])
@@ -32,14 +32,18 @@ async def wrapper(*args, **kwargs):
3232
_logger,
3333
logging.INFO,
3434
msg=f"calling {func.__name__} with {args}, {kwargs}",
35-
), log_catch(_logger, reraise=True):
36-
result = await func(*args, **kwargs)
37-
return orjson.dumps(
38-
jsonable_encoder(
39-
result,
40-
custom_encoder=_RPC_CUSTOM_ENCODER,
41-
)
42-
)
35+
):
36+
try:
37+
result = await func(*args, **kwargs)
38+
return result
39+
except asyncio.CancelledError:
40+
_logger.debug("call was cancelled")
41+
raise
42+
except Exception as exc: # pylint: disable=broad-except
43+
_logger.exception("Unhandled exception:")
44+
raise RPCServerError(
45+
method_name=func.__name__, exc_type=type(exc), msg=f"{exc}"
46+
) from exc
4347

4448
self.routes[RPCMethodName(func.__name__)] = wrapper
4549
return func

packages/service-library/tests/rabbitmq/test_rabbitmq_rpc_router.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from collections.abc import Awaitable, Callable
55

6-
import orjson
76
import pytest
87
from faker import Faker
98
from servicelib.rabbitmq import (
@@ -59,21 +58,21 @@ async def test_exposed_methods(
5958
router, router_namespace, a_arg, a_global_kwarg=a_kwargs
6059
)
6160

62-
json_result = await rpc_client.request(
61+
rpc_result = await rpc_client.request(
6362
router_namespace,
6463
RPCMethodName(a_str_method.__name__),
6564
a_specific_kwarg=a_specific_kwarg,
6665
)
67-
assert isinstance(json_result, bytes)
68-
result = orjson.loads(json_result)
66+
assert isinstance(rpc_result, str)
67+
result = rpc_result
6968
assert result == f"{a_arg}, that was a winner! {a_kwargs} {a_specific_kwarg}"
7069

71-
json_result = await rpc_client.request(
70+
rpc_result = await rpc_client.request(
7271
router_namespace,
7372
RPCMethodName(an_int_method.__name__),
7473
)
75-
assert isinstance(json_result, bytes)
76-
result = orjson.loads(json_result)
74+
assert isinstance(rpc_result, int)
75+
result = rpc_result
7776
assert result == 34
7877

7978
with pytest.raises(RuntimeError):

scripts/common.Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ hel%:
7272
@echo ""
7373

7474

75+
.env: .env-devel ## creates .env file from defaults in .env-devel
76+
$(if $(wildcard $@), \
77+
@echo "WARNING ##### $< is newer than $@ ####"; diff -uN $@ $<; false;,\
78+
@echo "WARNING ##### $@ does not exist, cloning $< as $@ ############"; cp $< $@)
79+
80+
7581
.PHONY: devenv
7682
devenv: ## build development environment
7783
@$(MAKE_C) $(REPO_BASE_DIR) $@

services/clusters-keeper/Makefile

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@
44
include ../../scripts/common.Makefile
55
include ../../scripts/common-service.Makefile
66

7-
.env: .env-devel ## creates .env file from defaults in .env-devel
8-
$(if $(wildcard $@), \
9-
@echo "WARNING ##### $< is newer than $@ ####"; diff -uN $@ $<; false;,\
10-
@echo "WARNING ##### $@ does not exist, cloning $< as $@ ############"; cp $< $@)
117

128

139
.PHONY: test-local

services/clusters-keeper/tests/unit/test_rpc_clusters.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ async def test_get_or_create_cluster(
152152
wallet_id=wallet_id,
153153
)
154154
assert rpc_response
155-
created_cluster = ClusterGet.parse_raw(rpc_response)
155+
assert isinstance(rpc_response, ClusterGet)
156+
created_cluster = rpc_response
156157
# check we do have a new machine in AWS
157158
await _assert_cluster_instance_created(ec2_client, user_id, wallet_id)
158159
# it is called once as moto server creates instances instantly
@@ -167,7 +168,8 @@ async def test_get_or_create_cluster(
167168
wallet_id=wallet_id,
168169
)
169170
assert rpc_response
170-
returned_cluster = ClusterGet.parse_raw(rpc_response)
171+
assert isinstance(rpc_response, ClusterGet)
172+
returned_cluster = rpc_response
171173
# check we still have only 1 instance
172174
await _assert_cluster_heartbeat_on_instance(ec2_client)
173175
mocked_dask_ping_gateway.ping_gateway.assert_called_once()

0 commit comments

Comments
 (0)