Skip to content

Commit df0856f

Browse files
tarbaigtariq
andauthored
FIX Record instances deserialize properly when returned by agent.ask (#152)
* FIX Record instances deserialize properly when returned by agent.ask Faust uses an internal model store to keep track of classes deriving from ```Record```. When sent via Kafka those classes are transparently reconstructed from the deserialized data. The corresponding logic resides in ```faust.models.base.maybe_model``` which must be used to wrap the reply value of ```faust.agents.replies.ReplyConsumer._drain_replies```. * fixed linting Co-authored-by: tariq <tariq@attariq.de>
1 parent bef8710 commit df0856f

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

faust/agents/replies.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
__all__ = ["ReplyPromise", "BarrierState", "ReplyConsumer"]
1414

15+
from ..models import maybe_model
16+
1517

1618
class ReplyTuple(NamedTuple):
1719
correlation_id: str
@@ -181,7 +183,7 @@ async def _start_fetcher(self, topic_name: str) -> None:
181183
async def _drain_replies(self, channel: ChannelT) -> None:
182184
async for reply in channel.stream():
183185
for promise in self._waiting[reply.correlation_id]:
184-
promise.fulfill(reply.correlation_id, reply.value)
186+
promise.fulfill(reply.correlation_id, maybe_model(reply.value))
185187

186188
def _reply_topic(self, topic: str) -> TopicT:
187189
return self.app.topic(

tests/unit/agents/test_replies.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
import asyncio
2+
import json
23

34
import pytest
45
from mode.utils.mocks import AsyncMock, Mock
56

7+
from faust import Record
68
from faust.agents.models import ReqRepResponse
79
from faust.agents.replies import BarrierState, ReplyConsumer, ReplyPromise
810

911

12+
class Account(Record, serializer="json"):
13+
id: str
14+
name: str
15+
active: bool = True
16+
17+
1018
def test_ReplyPromise():
1119
r = ReplyPromise(reply_to="rt", correlation_id="id1")
1220
assert r.reply_to == "rt"
@@ -207,24 +215,35 @@ async def test_start_fetcher(self, *, c):
207215

208216
@pytest.mark.asyncio
209217
async def test_drain_replies(self, *, c):
218+
an_account = Account(id="1", name="aName", active=False)
210219
responses = [
211220
ReqRepResponse(key="key1", value="value1", correlation_id="id1"),
212221
ReqRepResponse(key="key2", value="value2", correlation_id="id2"),
222+
ReqRepResponse.from_data(
223+
json.loads(
224+
ReqRepResponse(
225+
key="key3", value=an_account, correlation_id="id3"
226+
).dumps()
227+
)
228+
),
213229
]
214230
channel = Mock(
215231
stream=Mock(return_value=self._response_stream(responses)),
216232
)
217233
p1 = Mock()
218234
p2 = Mock()
219235
p3 = Mock()
236+
p4 = Mock()
220237
c._waiting["id1"] = {p1, p2}
221238
c._waiting["id2"] = {p3}
239+
c._waiting["id3"] = {p4}
222240

223241
await c._drain_replies(channel)
224242

225243
p1.fulfill.assert_called_once_with("id1", "value1")
226244
p2.fulfill.assert_called_once_with("id1", "value1")
227245
p3.fulfill.assert_called_once_with("id2", "value2")
246+
p4.fulfill.assert_called_once_with("id3", an_account)
228247

229248
async def _response_stream(self, responses):
230249
for response in responses:

0 commit comments

Comments
 (0)