Skip to content

Commit 088a9f7

Browse files
committed
Fix CoS examples
1 parent a3dcb04 commit 088a9f7

File tree

3 files changed

+14
-11
lines changed

3 files changed

+14
-11
lines changed

examples/cos/cos.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,22 +122,23 @@ class Agent {
122122

123123
let result = this.handle(msg);
124124

125-
if (!msg.reply) {
125+
const replyAddr = msg.reply? msg.reply.address : undefined;
126+
if (!replyAddr) {
126127
return
127128
}
128129

129130
if (isAsyncIterator(result)) {
130131
for await (let r of result) {
131132
console.log(`partial result: ${JSON.stringify(r)}`);
132-
await this.channel.publish(msg.reply, r);
133+
await this.channel.publish(replyAddr, r);
133134
}
134135
// End of the iteration, send an extra StopIteration message.
135136
const stop = {header: {type: 'StopIteration'}}
136-
await this.channel.publish(msg.reply, stop);
137+
await this.channel.publish(replyAddr, stop);
137138
} else {
138139
result = await result;
139140
console.log(`result: ${JSON.stringify(result)}`);
140-
await this.channel.publish(msg.reply, result);
141+
await this.channel.publish(replyAddr, result);
141142
}
142143
}
143144
}

examples/cos/cos.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,20 @@ async def receive(self, msg: dict) -> None:
111111

112112
result = self.handle(msg)
113113

114-
reply = msg.get("reply")
115-
if not reply:
114+
reply = msg.get("reply") or {}
115+
reply_addr = reply.get("address")
116+
if not reply_addr:
116117
return
117118

118119
if is_async_iterator(result):
119120
async for x in result:
120-
await self.channel.publish(reply, x)
121+
await self.channel.publish(reply_addr, x)
121122
# End of the iteration, send an extra StopIteration message.
122123
stop = {"header": {"type": "StopIteration"}}
123-
await self.channel.publish(reply, stop)
124+
await self.channel.publish(reply_addr, stop)
124125
else:
125126
x = await result
126-
await self.channel.publish(reply, x)
127+
await self.channel.publish(reply_addr, x)
127128

128129
@abc.abstractmethod
129130
async def handle(self, msg: dict) -> Any:

examples/cos/goagent/cos.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,12 @@ func (a *Server) Receive(data string) {
228228
slog.Info("[Server.Receive] handle result.", slog.Any("result", result))
229229

230230
slog.Info("[Server.Receive] reply.", slog.Any("msg.Reply", msg.Reply))
231-
if len(msg.Reply) == 0 {
231+
replyAddr, ok := msg.Reply["address"].(map[string]any)
232+
if !ok || len(replyAddr) == 0 {
232233
return
233234
}
234235

235-
err = a.channel.Publish(msg.Reply, result)
236+
err = a.channel.Publish(replyAddr, result)
236237
if err != nil {
237238
slog.Error("[Server.Receive] Error publishing reply.", slog.Any("err", err), slog.Any("msg.Reply", msg.Reply))
238239
}

0 commit comments

Comments
 (0)