@@ -12,7 +12,7 @@ use eventage::{
1212 EventBus ,
1313} ;
1414use serde_json:: json;
15- use std:: collections:: HashMap ;
15+ use std:: collections:: { HashMap , VecDeque } ;
1616use std:: sync:: Arc ;
1717use tokio:: sync:: Mutex ;
1818use tracing:: { debug, info, warn} ;
@@ -297,9 +297,9 @@ impl EventWorker for RelayWorker {
297297 if let Some ( target_bus) = self . group_buses . get ( target) {
298298 target_bus
299299 . publish ( Event :: new (
300- kinds:: USER_MESSAGE ,
300+ kinds:: AGENT_MESSAGE ,
301301 json ! ( {
302- "text" : format!( "[Message from group '{source}' ]\n {content}" ) ,
302+ "text" : format!( "[Agent '{}' says ]\n {content}" , source ) ,
303303 "source_group" : source,
304304 "message_id" : msg_id,
305305 "via" : "relay" ,
@@ -314,3 +314,112 @@ impl EventWorker for RelayWorker {
314314 Ok ( ( ) )
315315 }
316316}
317+
318+ // ── DelegationReplyWorker ─────────────────────────────────────────────────────
319+
320+ /// Runs on each group's **per-group bus**.
321+ ///
322+ /// Closes the reply loop for `MessageGroupTool(await_reply=true)`:
323+ /// 1. Detects inbound relay requests (`agent.message` with `via: "relay"`)
324+ /// 2. Captures the agent's final response (`assistant.message` content)
325+ /// 3. On `agent.cycle.end`, publishes `CLAW_GROUP_MESSAGE` on the shared bus
326+ /// with `in_reply_to` set, unblocking the calling agent's `await_reply`.
327+ ///
328+ /// A `VecDeque` queue handles back-to-back delegations arriving before the
329+ /// agent finishes its current cycle.
330+ pub struct DelegationReplyWorker {
331+ pub shared_bus : EventBus ,
332+ pub group_name : String ,
333+ pending : Arc < Mutex < VecDeque < PendingReply > > > ,
334+ last_content : Arc < Mutex < Option < String > > > ,
335+ }
336+
337+ struct PendingReply {
338+ source_group : String ,
339+ message_id : String ,
340+ }
341+
342+ impl DelegationReplyWorker {
343+ pub fn new ( shared_bus : EventBus , group_name : impl Into < String > ) -> Self {
344+ Self {
345+ shared_bus,
346+ group_name : group_name. into ( ) ,
347+ pending : Arc :: new ( Mutex :: new ( VecDeque :: new ( ) ) ) ,
348+ last_content : Arc :: new ( Mutex :: new ( None ) ) ,
349+ }
350+ }
351+ }
352+
353+ #[ async_trait]
354+ impl EventWorker for DelegationReplyWorker {
355+ fn subscribed_kinds ( & self ) -> Vec < String > {
356+ vec ! [
357+ kinds:: AGENT_MESSAGE . to_string( ) ,
358+ kinds:: ASSISTANT_MESSAGE . to_string( ) ,
359+ kinds:: AGENT_CYCLE_END . to_string( ) ,
360+ ]
361+ }
362+
363+ async fn handle ( & self , event : & Event , _bus : & EventBus ) -> Result < ( ) , WorkerError > {
364+ match event. kind . as_str ( ) {
365+ k if k == kinds:: AGENT_MESSAGE => {
366+ // Only track relay requests, not broadcasts from this group's own agent.
367+ if event. payload . get ( "via" ) . and_then ( |v| v. as_str ( ) ) != Some ( "relay" ) {
368+ return Ok ( ( ) ) ;
369+ }
370+ let source = event. payload [ "source_group" ] . as_str ( ) . unwrap_or ( "" ) . to_string ( ) ;
371+ let msg_id = event. payload [ "message_id" ] . as_str ( ) . unwrap_or ( "" ) . to_string ( ) ;
372+ if !msg_id. is_empty ( ) {
373+ debug ! (
374+ group = %self . group_name,
375+ source = %source,
376+ msg_id = %msg_id,
377+ "DelegationReplyWorker: queued relay request"
378+ ) ;
379+ self . pending . lock ( ) . await . push_back ( PendingReply { source_group : source, message_id : msg_id } ) ;
380+ }
381+ }
382+
383+ k if k == kinds:: ASSISTANT_MESSAGE => {
384+ if let Some ( content) = event. payload . get ( "content" ) . and_then ( |v| v. as_str ( ) ) {
385+ if !content. trim ( ) . is_empty ( ) {
386+ * self . last_content . lock ( ) . await = Some ( content. to_string ( ) ) ;
387+ }
388+ }
389+ }
390+
391+ k if k == kinds:: AGENT_CYCLE_END => {
392+ let reply = self . pending . lock ( ) . await . pop_front ( ) ;
393+ let Some ( PendingReply { source_group, message_id } ) = reply else {
394+ return Ok ( ( ) ) ;
395+ } ;
396+
397+ let content = self . last_content . lock ( ) . await . take ( ) . unwrap_or_default ( ) ;
398+
399+ info ! (
400+ group = %self . group_name,
401+ target = %source_group,
402+ msg_id = %message_id,
403+ "DelegationReplyWorker: routing reply back to caller"
404+ ) ;
405+
406+ self . shared_bus
407+ . publish ( Event :: new (
408+ CLAW_GROUP_MESSAGE ,
409+ json ! ( {
410+ "target_group" : source_group,
411+ "source_group" : self . group_name,
412+ "content" : content,
413+ "in_reply_to" : message_id,
414+ } ) ,
415+ ) )
416+ . await
417+ . map_err ( WorkerError :: Bus ) ?;
418+ }
419+
420+ _ => { }
421+ }
422+
423+ Ok ( ( ) )
424+ }
425+ }
0 commit comments