Skip to content

Commit 7c68468

Browse files
committed
Always call start() and stop() of BaseAgent in its subclasses
1 parent e9bb70c commit 7c68468

File tree

3 files changed

+34
-34
lines changed

3 files changed

+34
-34
lines changed

coagent/core/agent.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,26 +161,31 @@ async def start(self) -> None:
161161
"""Start the current agent."""
162162

163163
# Subscribe the agent to its own address.
164-
self._sub = await self.channel.subscribe(self.address, handler=self.receive)
165-
166-
self._handle_data_task = asyncio.create_task(self._handle_data())
164+
self._sub = await self._create_subscription()
167165

168166
# Send a `Started` message to the current agent.
169167
await self.channel.publish(self.address, Started().encode(), probe=False)
170168

169+
if not self._handle_data_task:
170+
self._handle_data_task = asyncio.create_task(self._handle_data())
171+
172+
async def _create_subscription(self) -> Subscription:
173+
# Subscribe the agent's receive method to its own address.
174+
return await self.channel.subscribe(self.address, handler=self.receive)
175+
171176
async def stop(self) -> None:
172177
"""Stop the current agent."""
173178

174179
# Send a `Stopped` message to the current agent.
175180
await self.channel.publish(self.address, Stopped().encode(), probe=False)
176181

177-
if self._handle_data_task:
178-
self._handle_data_task.cancel()
179-
180182
# Unsubscribe the agent from its own address.
181183
if self._sub:
182184
await self._sub.unsubscribe()
183185

186+
if self._handle_data_task:
187+
self._handle_data_task.cancel()
188+
184189
async def started(self) -> None:
185190
"""This handler is called after the agent is started."""
186191
pass

coagent/core/discovery.py

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
Address,
1212
AgentSpec,
1313
RawMessage,
14+
Subscription,
1415
)
1516
from .util import Trie
1617

@@ -125,27 +126,28 @@ def __init__(self):
125126

126127
async def start(self) -> None:
127128
"""Since discovery is a special agent, we need to start it in a different way."""
128-
129-
# Each query message can only be received and handled by one discovery aggregator.
130-
self._sub = await self.channel.subscribe(
131-
self.address,
132-
handler=self.receive,
133-
queue=f"{self.address.topic}_workers",
134-
)
129+
await super().start()
135130

136131
# Create and start the local discovery server.
137132
self._server = DiscoveryServer()
138133
# We MUST set the channel and address manually.
139134
self._server.init(self.channel, Address(name=f"{self.address.name}.server"))
140135
await self._server.start()
141136

137+
async def _create_subscription(self) -> Subscription:
138+
# Each query message can only be received and handled by one discovery aggregator.
139+
return await self.channel.subscribe(
140+
self.address,
141+
handler=self.receive,
142+
queue=f"{self.address.topic}_workers",
143+
)
144+
142145
async def stop(self) -> None:
143146
"""Since discovery is a special agent, we need to stop it in a different way."""
144147
if self._server:
145148
await self._server.stop()
146149

147-
if self._sub:
148-
await self._sub.unsubscribe()
150+
await super().stop()
149151

150152
async def register(self, spec: AgentSpec) -> None:
151153
if spec.name == self.address.name:
@@ -250,9 +252,7 @@ def __init__(self):
250252

251253
async def start(self) -> None:
252254
"""Since discovery server is a special agent, we need to start it in a different way."""
253-
254-
# Subscribe the agent to its own address.
255-
self._sub = await self.channel.subscribe(self.address, handler=self.receive)
255+
await super().start()
256256

257257
# Upon startup, the current discovery server has no agent-subscriptions.
258258
# Therefore, it's necessary to synchronize the existing agent-subscriptions
@@ -283,13 +283,6 @@ async def receive(raw: RawMessage) -> None:
283283
finally:
284284
await sub.unsubscribe()
285285

286-
async def stop(self) -> None:
287-
"""Since discovery server is a special agent, we need to stop it in a different way."""
288-
289-
# Unsubscribe the agent from its own address.
290-
if self._sub:
291-
await self._sub.unsubscribe()
292-
293286
async def register(self, spec: AgentSpec) -> None:
294287
if spec.name == self.address.name:
295288
raise ValueError(f"Agent type '{self.address.name}' is reserved")

coagent/core/factory.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
Agent,
99
AgentSpec,
1010
State,
11+
Subscription,
1112
)
1213

1314

@@ -42,25 +43,24 @@ def __init__(self, spec: AgentSpec) -> None:
4243

4344
async def start(self) -> None:
4445
"""Since factory is a special agent, we need to start it in a different way."""
45-
# Subscribe the factory to the given address.
46+
await super().start()
47+
48+
# Start the recycle loop.
49+
self._recycle_task = asyncio.create_task(self._recycle())
50+
51+
async def _create_subscription(self) -> Subscription:
52+
# Each CreateAgent message can only be received and handled by one factory agent.
4653
#
4754
# Note that we specify a queue parameter to distribute requests among
4855
# multiple factory agents of the same type of primitive agent.
49-
self._sub = await self.channel.subscribe(
56+
return await self.channel.subscribe(
5057
self.address,
5158
handler=self.receive,
5259
queue=f"{self.address.topic}_workers",
5360
)
5461

55-
# Start the recycle loop.
56-
self._recycle_task = asyncio.create_task(self._recycle())
57-
5862
async def stop(self) -> None:
5963
"""Since factory is a special agent, we need to stop it in a different way."""
60-
# Unsubscribe the factory from the address.
61-
if self._sub:
62-
await self._sub.unsubscribe()
63-
6464
# Stop all agents.
6565
for agent in self._agents.values():
6666
await agent.stop()
@@ -70,6 +70,8 @@ async def stop(self) -> None:
7070
if self._recycle_task:
7171
self._recycle_task.cancel()
7272

73+
await super().stop()
74+
7375
async def _recycle(self) -> None:
7476
"""The recycle loop for deleting idle agents."""
7577
while True:

0 commit comments

Comments
 (0)