Skip to content

Commit 3e7712b

Browse files
committed
fix proactive sub agent spawning
1 parent df42936 commit 3e7712b

File tree

6 files changed

+272
-51
lines changed

6 files changed

+272
-51
lines changed

crates/example-eventage-claw/src/agent.rs

Lines changed: 129 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ use crate::hooks::{HumanApprovalHook, SecurityGateHook};
1111
use crate::prompt::build_system_prompt;
1212
use crate::streaming::StreamingOpenAiProvider;
1313
use crate::tools::{
14-
AddTaskTool, BrowserTool, CancelTaskTool, CompleteTaskTool, DockerRunCommandTool, EditFileTool,
15-
GlobTool, GrepTool, GroupRegistry, ListGroupsTool, ListSessionTasksTool, ListTasksTool, LsTool,
16-
MessageGroupTool, PauseTaskTool, ReadFileTool, RegisterGroupTool, ScheduleState,
17-
ScheduleTaskTool, TaskState, UpdateTaskTool, WebFetchTool, WebSearchTool, WriteFileTool,
18-
load_tasks, new_group_registry, new_task_state,
14+
AddTaskTool, AgentSpawner, BrowserTool, CancelTaskTool, CompleteTaskTool,
15+
DockerRunCommandTool, EditFileTool, GlobTool, GrepTool, GroupRegistry, ListGroupsTool,
16+
ListSessionTasksTool, ListTasksTool, LsTool, MessageGroupTool, PauseTaskTool, ReadFileTool,
17+
RegisterGroupTool, ScheduleState, ScheduleTaskTool, SpawnGroupTool, TaskState, UpdateTaskTool,
18+
WebFetchTool, WebSearchTool, WriteFileTool, load_tasks, new_group_registry, new_task_state,
19+
};
20+
use crate::workers::{
21+
ChannelOutputWorker, DelegationReplyWorker, GroupBuses, RelayWorker, SchedulerWorker,
1922
};
20-
use crate::workers::{ChannelOutputWorker, DelegationReplyWorker, RelayWorker, SchedulerWorker};
2123
use eventage::{
2224
agent::{ContextAssembler, DefaultContextAssembler},
2325
llm::OpenAiProvider,
@@ -26,7 +28,7 @@ use eventage::{
2628
};
2729
use std::collections::HashMap;
2830
use std::sync::{atomic::AtomicBool, Arc};
29-
use tokio::sync::Mutex;
31+
use tokio::sync::{Mutex, RwLock};
3032
use tracing::info;
3133
use uuid::Uuid;
3234

@@ -186,10 +188,13 @@ impl ClawAgentBuilder {
186188
.map(|g| g.name.clone())
187189
.collect();
188190

189-
// Build per-group buses first so we can pass them to workers
190-
let mut group_buses: HashMap<String, EventBus> = HashMap::new();
191+
// Shared live map: group name → per-group EventBus.
192+
// Arc<RwLock<…>> so dynamically spawned groups are immediately routable.
193+
let group_buses: GroupBuses = Arc::new(RwLock::new(HashMap::new()));
191194
for g in &config.groups {
192-
group_buses.insert(g.name.clone(), EventBus::new());
195+
group_buses
196+
.blocking_write()
197+
.insert(g.name.clone(), EventBus::new());
193198
}
194199

195200
// Build shared workers
@@ -203,12 +208,24 @@ impl ClawAgentBuilder {
203208
group_buses: group_buses.clone(),
204209
});
205210

211+
// Spawner used by SpawnGroupTool — holds everything needed to build a
212+
// new GroupAgent at runtime and insert it into the live routing table.
213+
let spawner: Arc<dyn AgentSpawner> = Arc::new(ClawGroupSpawner {
214+
config: Arc::new(config.clone()),
215+
shared_bus: shared_bus.clone(),
216+
group_buses: group_buses.clone(),
217+
group_registry: group_registry.clone(),
218+
schedule_state: schedule_state.clone(),
219+
session_id_prefix: self.session_id_prefix.clone(),
220+
tui_mode: self.tui_mode,
221+
});
222+
206223
// Build each group agent
207224
let mut groups: HashMap<String, GroupAgent> = HashMap::new();
208225
let active_group_name = config.groups.first().map(|g| g.name.clone()).unwrap_or_default();
209226

210227
for group_config in &config.groups {
211-
let group_bus = group_buses[&group_config.name].clone();
228+
let group_bus = group_buses.blocking_read()[&group_config.name].clone();
212229
let session_id = format!("{}-{}", self.session_id_prefix, group_config.name);
213230
let task_state = new_task_state();
214231

@@ -223,6 +240,7 @@ impl ClawAgentBuilder {
223240
self.tui_mode,
224241
session_id,
225242
task_state,
243+
spawner.clone(),
226244
);
227245

228246
groups.insert(group_config.name.clone(), group_agent);
@@ -239,6 +257,103 @@ impl ClawAgentBuilder {
239257
}
240258
}
241259

260+
// ── ClawGroupSpawner ──────────────────────────────────────────────────────────
261+
262+
/// Implements [`AgentSpawner`] for the claw runtime.
263+
///
264+
/// Held by `SpawnGroupTool` (main group only). On `spawn()`, it builds a full
265+
/// `GroupAgent`, inserts the new bus into the shared routing table, and starts
266+
/// the agent task — all without restarting the process.
267+
struct ClawGroupSpawner {
268+
config: Arc<ClawConfig>,
269+
shared_bus: EventBus,
270+
group_buses: GroupBuses,
271+
group_registry: GroupRegistry,
272+
schedule_state: ScheduleState,
273+
session_id_prefix: String,
274+
tui_mode: bool,
275+
}
276+
277+
#[async_trait::async_trait]
278+
impl AgentSpawner for ClawGroupSpawner {
279+
async fn spawn(&self, name: &str, system_prompt: Option<&str>) -> Result<(), String> {
280+
// Reject duplicates.
281+
if self.group_buses.read().await.contains_key(name) {
282+
return Err(format!("group '{name}' already exists"));
283+
}
284+
285+
let group_bus = EventBus::new();
286+
let session_id = format!("{}-{}", self.session_id_prefix, name);
287+
let task_state = new_task_state();
288+
289+
let group_config = GroupConfig {
290+
name: name.to_string(),
291+
is_main: false,
292+
system_prompt_suffix: system_prompt.map(|s| s.to_string()),
293+
human_approval_tools: vec![],
294+
require_approve_all: false,
295+
work_dir: None,
296+
allowed_senders: vec![],
297+
};
298+
299+
// Snapshot current group names for the new agent's MessageGroupTool hint.
300+
let known_groups: Vec<String> = self.group_registry.lock().await.clone();
301+
302+
// No recursive spawning from spawned sub-agents — pass a no-op spawner.
303+
let no_spawn: Arc<dyn AgentSpawner> = Arc::new(NoopSpawner);
304+
305+
let group_agent = build_group_agent(
306+
&group_config,
307+
&self.config,
308+
group_bus.clone(),
309+
self.shared_bus.clone(),
310+
self.schedule_state.clone(),
311+
self.group_registry.clone(),
312+
&known_groups,
313+
self.tui_mode,
314+
session_id,
315+
task_state,
316+
no_spawn,
317+
);
318+
319+
// Register in routing table before spawning so RelayWorker can route
320+
// the very first message the main agent sends after spawn returns.
321+
self.group_buses.write().await.insert(name.to_string(), group_bus);
322+
self.group_registry.lock().await.push(name.to_string());
323+
324+
let group_name = name.to_string();
325+
let agent = group_agent.agent;
326+
let ws = group_agent.worker_set;
327+
let bus = group_agent.bus;
328+
329+
tokio::spawn(async move {
330+
let worker_bus = bus.clone();
331+
let worker_name = group_name.clone();
332+
tokio::spawn(async move {
333+
if let Err(e) = ws.run_on(worker_bus).await {
334+
tracing::warn!(group = %worker_name, "spawned group worker error: {e}");
335+
}
336+
});
337+
if let Err(e) = agent.run().await {
338+
tracing::warn!(group = %group_name, "spawned group agent exited: {e}");
339+
}
340+
});
341+
342+
info!(group = %name, "ClawGroupSpawner: sub-agent spawned");
343+
Ok(())
344+
}
345+
}
346+
347+
/// Placeholder spawner for sub-agents that should not spawn further agents.
348+
struct NoopSpawner;
349+
350+
#[async_trait::async_trait]
351+
impl AgentSpawner for NoopSpawner {
352+
async fn spawn(&self, name: &str, _system_prompt: Option<&str>) -> Result<(), String> {
353+
Err(format!("sub-agent cannot spawn further agents (requested: '{name}')"))
354+
}
355+
}
356+
242357
// ── build_group_agent ─────────────────────────────────────────────────────────
243358

244359
#[allow(clippy::too_many_arguments)]
@@ -253,6 +368,7 @@ fn build_group_agent(
253368
tui_mode: bool,
254369
session_id: String,
255370
task_state: TaskState,
371+
spawner: Arc<dyn AgentSpawner>,
256372
) -> GroupAgent {
257373
let work_dir = config.group_work_dir(&group_config.name);
258374
let _ = std::fs::create_dir_all(&work_dir);
@@ -398,7 +514,8 @@ fn build_group_agent(
398514
})
399515
.tool(ListGroupsTool {
400516
registry: group_registry.clone(),
401-
});
517+
})
518+
.tool(SpawnGroupTool { spawner });
402519
}
403520

404521
// ── Hook ──────────────────────────────────────────────────────────────────

crates/example-eventage-claw/src/kinds.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ pub const CLAW_SCHEDULE_UPDATE: &str = "claw.schedule.update";
2525
/// `RelayWorker` subscribes and routes the message to the target group's bus.
2626
pub const CLAW_GROUP_MESSAGE: &str = "claw.group.message";
2727

28+
/// Published by `DelegationReplyWorker` when a sub-agent completes a relay request.
29+
/// `MessageGroupTool` (await_reply=true) waits for this on the shared bus.
30+
/// Kept separate from `CLAW_GROUP_MESSAGE` so `RelayWorker` never re-routes replies.
31+
pub const CLAW_GROUP_REPLY: &str = "claw.group.reply";
32+
2833
/// Published by the TUI when the user switches the active group.
2934
pub const CLAW_GROUP_SWITCH: &str = "claw.group.switch";
3035

crates/example-eventage-claw/src/tools/group.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ use tokio::sync::Mutex;
88

99
use crate::kinds::CLAW_GROUP_REGISTER;
1010

11+
// ── AgentSpawner trait ────────────────────────────────────────────────────────
12+
13+
/// Abstracts over the runtime agent-spawning logic so `SpawnGroupTool` does
14+
/// not need to import from `agent.rs` (which would create a circular dep).
15+
#[async_trait]
16+
pub trait AgentSpawner: Send + Sync {
17+
async fn spawn(&self, name: &str, system_prompt: Option<&str>) -> Result<(), String>;
18+
}
19+
1120
/// Shared runtime group registry (name → bool for "active").
1221
pub type GroupRegistry = Arc<Mutex<Vec<String>>>;
1322

@@ -105,3 +114,58 @@ impl Tool for ListGroupsTool {
105114
}))
106115
}
107116
}
117+
118+
// ── SpawnGroupTool ────────────────────────────────────────────────────────────
119+
120+
/// Dynamically spawns a new sub-agent group at runtime.
121+
///
122+
/// The new agent gets its own isolated EventBus, the full tool set, and begins
123+
/// listening immediately — no restart required. `message_group` can reach it
124+
/// the moment this tool returns. Only the main group has this tool.
125+
pub struct SpawnGroupTool {
126+
pub spawner: Arc<dyn AgentSpawner>,
127+
}
128+
129+
#[async_trait]
130+
impl Tool for SpawnGroupTool {
131+
fn definition(&self) -> ToolDefinition {
132+
ToolDefinition::function(
133+
"spawn_group",
134+
"Spawn a new sub-agent group at runtime with an isolated context and event bus. \
135+
The agent starts immediately and can be reached via message_group. \
136+
Only usable by the main group.",
137+
json!({
138+
"type": "object",
139+
"properties": {
140+
"name": {
141+
"type": "string",
142+
"description": "Unique group name for the new agent (e.g. 'researcher', 'coder')."
143+
},
144+
"system_prompt": {
145+
"type": "string",
146+
"description": "System prompt that defines this agent's persona and task focus."
147+
}
148+
},
149+
"required": ["name", "system_prompt"]
150+
}),
151+
)
152+
}
153+
154+
async fn execute(&self, args: Value) -> Result<Value, AgentError> {
155+
let name = args["name"]
156+
.as_str()
157+
.ok_or_else(|| AgentError::Tool("missing 'name'".into()))?;
158+
let prompt = args["system_prompt"].as_str();
159+
160+
self.spawner
161+
.spawn(name, prompt)
162+
.await
163+
.map_err(AgentError::Tool)?;
164+
165+
Ok(json!({
166+
"spawned": true,
167+
"name": name,
168+
"message": format!("Agent '{name}' is running. Use message_group to communicate with it."),
169+
}))
170+
}
171+
}

crates/example-eventage-claw/src/tools/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ pub mod web;
1313
pub use browser::BrowserTool;
1414
pub use docker::DockerRunCommandTool;
1515
pub use fs::{EditFileTool, GlobTool, GrepTool, LsTool, ReadFileTool, WriteFileTool};
16-
pub use group::{new_group_registry, GroupRegistry, ListGroupsTool, RegisterGroupTool};
16+
pub use group::{
17+
new_group_registry, AgentSpawner, GroupRegistry, ListGroupsTool, RegisterGroupTool,
18+
SpawnGroupTool,
19+
};
1720
pub use relay::MessageGroupTool;
1821
pub use schedule::{
1922
load_tasks, CancelTaskTool, ListTasksTool, PauseTaskTool, ScheduleState, ScheduleTaskTool,

crates/example-eventage-claw/src/tools/relay.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
//! Inter-group IPC tool — the key EventBus-as-IPC demonstration.
22
//!
33
//! `MessageGroupTool` lets an agent send a message to another named group.
4-
//! It publishes a `CLAW_GROUP_MESSAGE` event on the shared bus; the
5-
//! `RelayWorker` subscribes and routes it to the target group's per-group bus
6-
//! as a `user.message` event.
4+
//! It publishes a `CLAW_GROUP_MESSAGE` event on the shared bus; `RelayWorker`
5+
//! routes it to the target group's per-group bus as an `agent.message` event.
6+
//! When `await_reply=true`, the tool waits on the shared bus for a matching
7+
//! `CLAW_GROUP_REPLY` published by the target group's `DelegationReplyWorker`.
78
89
use async_trait::async_trait;
910
use eventage::{AgentError, Event, EventBus, Tool, ToolDefinition};
1011
use serde_json::{json, Value};
1112
use std::time::Duration;
1213
use uuid::Uuid;
1314

14-
use crate::kinds::CLAW_GROUP_MESSAGE;
15+
use crate::kinds::{CLAW_GROUP_MESSAGE, CLAW_GROUP_REPLY};
1516

1617
pub struct MessageGroupTool {
1718
/// Shared bus — events published here are visible to all workers.
@@ -90,7 +91,7 @@ impl Tool for MessageGroupTool {
9091

9192
let result = tokio::time::timeout(Duration::from_secs(30), async move {
9293
bus.wait_for(|e: &Event| {
93-
e.kind == CLAW_GROUP_MESSAGE
94+
e.kind == CLAW_GROUP_REPLY
9495
&& e.payload["source_group"].as_str() == Some(&target_owned)
9596
&& e.payload["in_reply_to"].as_str() == Some(&msg_id_clone)
9697
})

0 commit comments

Comments
 (0)