Skip to content
This repository was archived by the owner on Jan 28, 2022. It is now read-only.

Commit e3ede6e

Browse files
author
Sergio García Prado
authored
Merge pull request #433 from Clariteia/0.1.1
0.1.1
2 parents ec60b58 + 5900854 commit e3ede6e

File tree

14 files changed

+135
-25
lines changed

14 files changed

+135
-25
lines changed

HISTORY.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,9 @@ History
120120
------------------
121121

122122
* Add `minos-microservice-common>=0.2.0` compatibility.
123+
124+
0.1.1 (2021-11-09)
125+
------------------
126+
127+
* Add `REPLY_TOPIC_CONTEXT_VAR` and integrate with `DynamicHandlerPool`.
128+
* Add support for `post_fn` callbacks following the same strategy as in `pre_fn` callbacks.

minos/networks/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
__author__ = """Clariteia Devs"""
22
__email__ = "devs@clariteia.com"
3-
__version__ = "0.1.0"
3+
__version__ = "0.1.1"
44

55
from .brokers import (
6+
REPLY_TOPIC_CONTEXT_VAR,
67
Broker,
78
BrokerSetup,
89
CommandBroker,

minos/networks/brokers/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
CommandReplyBroker,
77
)
88
from .commands import (
9+
REPLY_TOPIC_CONTEXT_VAR,
910
CommandBroker,
1011
)
1112
from .events import (

minos/networks/brokers/commands.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
)
44

55
import logging
6+
from contextvars import (
7+
ContextVar,
8+
)
69
from typing import (
710
Any,
11+
Final,
812
Optional,
913
)
1014
from uuid import (
@@ -22,6 +26,8 @@
2226

2327
logger = logging.getLogger(__name__)
2428

29+
REPLY_TOPIC_CONTEXT_VAR: Final[ContextVar[Optional[str]]] = ContextVar("reply_topic", default=None)
30+
2531

2632
class CommandBroker(Broker):
2733
"""Minos Command Broker Class."""
@@ -57,8 +63,11 @@ async def send(
5763
the command is not authenticated.
5864
:return: This method does not return anything.
5965
"""
66+
if reply_topic is None:
67+
reply_topic = REPLY_TOPIC_CONTEXT_VAR.get()
6068
if reply_topic is None:
6169
reply_topic = self.default_reply_topic
70+
6271
command = Command(topic, data, saga, reply_topic, user)
6372
logger.info(f"Sending '{command!s}'...")
6473
return await self.enqueue(command.topic, command.avro_bytes)

minos/networks/decorators/builders.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
defaultdict,
66
)
77
from inspect import (
8-
iscoroutinefunction,
8+
isawaitable,
99
)
1010
from typing import (
1111
Awaitable,
@@ -112,34 +112,30 @@ def _build_one_class(
112112

113113
for name, decorators in mapping.items():
114114
for decorator in decorators:
115-
ans[decorator].add(self._build_one_method(class_, name, decorator.pre_fn_name))
115+
ans[decorator].add(self._build_one_method(class_, name, decorator.pre_fn_name, decorator.post_fn_name))
116116

117117
@staticmethod
118-
def _build_one_method(class_: type, name: str, pref_fn_name: str, **kwargs) -> Handler:
118+
def _build_one_method(class_: type, name: str, pref_fn_name: str, post_fn_name: str, **kwargs) -> Handler:
119119
instance = class_(**kwargs)
120120
fn = getattr(instance, name)
121121
pre_fn = getattr(instance, pref_fn_name, None)
122+
post_fn = getattr(instance, post_fn_name, None)
122123

123-
if iscoroutinefunction(fn):
124-
_awaitable_fn = fn
125-
else:
124+
async def _wrapped_fn(request: Request) -> Optional[Response]:
125+
if pre_fn is not None:
126+
request = pre_fn(request)
127+
if isawaitable(request):
128+
request = await request
126129

127-
async def _awaitable_fn(request: Request) -> Optional[Response]:
128-
return fn(request)
130+
response = fn(request)
131+
if isawaitable(response):
132+
response = await response
129133

130-
if pre_fn is None:
131-
_wrapped_fn = _awaitable_fn
132-
else:
133-
if iscoroutinefunction(pre_fn):
134+
if post_fn is not None:
135+
response = post_fn(response)
136+
if isawaitable(response):
137+
response = await response
134138

135-
async def _wrapped_fn(request: Request) -> Optional[Response]:
136-
request = await pre_fn(request)
137-
return await _awaitable_fn(request)
138-
139-
else:
140-
141-
async def _wrapped_fn(request: Request) -> Optional[Response]:
142-
request = pre_fn(request)
143-
return await _awaitable_fn(request)
139+
return response
144140

145141
return _wrapped_fn

minos/networks/decorators/definitions/abc.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,11 @@ def pre_fn_name(self) -> str:
8181
:return: A string value containing the function name.
8282
"""
8383
return self.KIND.pre_fn_name
84+
85+
@property
86+
def post_fn_name(self) -> str:
87+
"""Get the post execution function name.
88+
89+
:return: A string value containing the function name.
90+
"""
91+
return self.KIND.post_fn_name

minos/networks/decorators/definitions/kinds.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,16 @@ def pre_fn_name(self) -> str:
2323
self.Event: "_pre_event_handle",
2424
}
2525
return mapping[self]
26+
27+
@property
28+
def post_fn_name(self) -> str:
29+
"""Get the post execution function name.
30+
31+
:return: A string value containing the function name.
32+
"""
33+
mapping = {
34+
self.Command: "_post_command_handle",
35+
self.Query: "_post_query_handle",
36+
self.Event: "_post_event_handle",
37+
}
38+
return mapping[self]

minos/networks/handlers/dynamic/pools.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
)
44

55
import logging
6+
from contextvars import (
7+
Token,
8+
)
9+
from typing import (
10+
AsyncContextManager,
11+
Optional,
12+
)
613
from uuid import (
714
uuid4,
815
)
@@ -23,6 +30,9 @@
2330
MinosPool,
2431
)
2532

33+
from ...brokers import (
34+
REPLY_TOPIC_CONTEXT_VAR,
35+
)
2636
from ..consumers import (
2737
Consumer,
2838
)
@@ -82,3 +92,27 @@ async def _subscribe_reply_topic(self, topic: str) -> None:
8292

8393
async def _unsubscribe_reply_topic(self, topic: str) -> None:
8494
await self.consumer.remove_topic(topic)
95+
96+
def acquire(self, *args, **kwargs) -> AsyncContextManager:
97+
"""Acquire a new instance wrapped on an asynchronous context manager.
98+
99+
:return: An asynchronous context manager.
100+
"""
101+
return _ReplyTopicContextManager(super().acquire())
102+
103+
104+
class _ReplyTopicContextManager:
105+
_token: Optional[Token]
106+
107+
def __init__(self, wrapper: AsyncContextManager[DynamicHandler]):
108+
self.wrapper = wrapper
109+
self._token = None
110+
111+
async def __aenter__(self) -> DynamicHandler:
112+
handler = await self.wrapper.__aenter__()
113+
self._token = REPLY_TOPIC_CONTEXT_VAR.set(handler.topic)
114+
return handler
115+
116+
async def __aexit__(self, exc_type, exc_val, exc_tb):
117+
REPLY_TOPIC_CONTEXT_VAR.reset(self._token)
118+
await self.wrapper.__aexit__(exc_type, exc_val, exc_tb)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "minos_microservice_networks"
3-
version = "0.1.0"
3+
version = "0.1.1"
44
description = "Python Package with the common network classes and utilities used in Minos Microservice."
55
readme = "README.md"
66
repository = "https://github.com/clariteia/minos_microservice_network"

tests/test_networks/test_brokers/test_commands.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
PostgresAsyncTestCase,
1414
)
1515
from minos.networks import (
16+
REPLY_TOPIC_CONTEXT_VAR,
1617
CommandBroker,
1718
)
1819
from tests.utils import (
@@ -65,6 +66,23 @@ async def test_send_with_default_reply_topic(self):
6566
self.assertEqual("fake", args[0])
6667
self.assertEqual(Command("fake", FakeModel("foo"), saga, "OrderReply"), Command.from_avro_bytes(args[1]))
6768

69+
async def test_send_with_reply_topic_context_var(self):
70+
mock = AsyncMock(return_value=56)
71+
saga = uuid4()
72+
73+
REPLY_TOPIC_CONTEXT_VAR.set("onetwothree")
74+
75+
async with CommandBroker.from_config(config=self.config) as broker:
76+
broker.enqueue = mock
77+
identifier = await broker.send(FakeModel("foo"), "fake", saga)
78+
79+
self.assertEqual(56, identifier)
80+
self.assertEqual(1, mock.call_count)
81+
82+
args = mock.call_args.args
83+
self.assertEqual("fake", args[0])
84+
self.assertEqual(Command("fake", FakeModel("foo"), saga, "onetwothree"), Command.from_avro_bytes(args[1]))
85+
6886
async def test_send_with_user(self):
6987
mock = AsyncMock(return_value=56)
7088
saga = uuid4()

0 commit comments

Comments
 (0)