Skip to content

Commit 8bc6430

Browse files
louis030195claude
andcommitted
feat(mcp-agent): broadcast progress notifications to all connected clients
- Add broadcast_peers list to track all connected MCP clients - Forward emit.progress() updates to ALL peers, not just the initiating one - Auto-cleanup dead peers on send failure Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 7300cb0 commit 8bc6430

File tree

2 files changed

+52
-12
lines changed

2 files changed

+52
-12
lines changed

crates/terminator-mcp-agent/src/server.rs

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,7 @@ impl DesktopWrapper {
812812
inspect_overlay_handle: Arc::new(std::sync::Mutex::new(None)),
813813
client_modes: Arc::new(Mutex::new(std::collections::HashMap::new())),
814814
elicitation_peer: Arc::new(Mutex::new(None)),
815+
broadcast_peers: Arc::new(Mutex::new(Vec::new())),
815816
})
816817
}
817818

@@ -4004,6 +4005,7 @@ DATA PASSING:
40044005

40054006
// Clone peer for the event handler task
40064007
let peer_clone = peer.clone();
4008+
let broadcast_peers_clone = self.broadcast_peers.clone();
40074009
let progress_token_clone = progress_token.clone();
40084010

40094011
// Spawn task to collect screenshot events AND forward progress notifications
@@ -4029,16 +4031,27 @@ DATA PASSING:
40294031
message,
40304032
..
40314033
} => {
4032-
// Forward progress events as MCP notifications
4033-
if let Some(ref p) = peer_clone {
4034-
let _ = p
4035-
.notify_progress(ProgressNotificationParam {
4034+
// Broadcast progress events to ALL connected MCP clients
4035+
// Dead peers are cleaned up on send failure
4036+
let mut peers = broadcast_peers_clone.lock().await;
4037+
let mut dead_indices = Vec::new();
4038+
for (i, p) in peers.iter().enumerate() {
4039+
if p.notify_progress(ProgressNotificationParam {
40364040
progress_token: progress_token_clone.clone(),
40374041
progress: current,
40384042
total,
4039-
message,
4043+
message: message.clone(),
40404044
})
4041-
.await;
4045+
.await
4046+
.is_err()
4047+
{
4048+
dead_indices.push(i);
4049+
}
4050+
}
4051+
// Remove dead peers (reverse order to preserve indices)
4052+
for i in dead_indices.into_iter().rev() {
4053+
peers.remove(i);
4054+
tracing::debug!("[broadcast] Removed dead peer at index {}", i);
40424055
}
40434056
}
40444057
WorkflowEvent::Status { text, .. } => {
@@ -4272,6 +4285,7 @@ DATA PASSING:
42724285

42734286
// Clone peer for the event handler task
42744287
let peer_clone = peer.clone();
4288+
let broadcast_peers_clone2 = self.broadcast_peers.clone();
42754289
let progress_token_clone = progress_token.clone();
42764290

42774291
// Spawn task to collect screenshot events AND forward progress notifications
@@ -4297,16 +4311,27 @@ DATA PASSING:
42974311
message,
42984312
..
42994313
} => {
4300-
// Forward progress events as MCP notifications
4301-
if let Some(ref p) = peer_clone {
4302-
let _ = p
4303-
.notify_progress(ProgressNotificationParam {
4314+
// Broadcast progress events to ALL connected MCP clients
4315+
// Dead peers are cleaned up on send failure
4316+
let mut peers = broadcast_peers_clone2.lock().await;
4317+
let mut dead_indices = Vec::new();
4318+
for (i, p) in peers.iter().enumerate() {
4319+
if p.notify_progress(ProgressNotificationParam {
43044320
progress_token: progress_token_clone.clone(),
43054321
progress: current,
43064322
total,
4307-
message,
4323+
message: message.clone(),
43084324
})
4309-
.await;
4325+
.await
4326+
.is_err()
4327+
{
4328+
dead_indices.push(i);
4329+
}
4330+
}
4331+
// Remove dead peers (reverse order to preserve indices)
4332+
for i in dead_indices.into_iter().rev() {
4333+
peers.remove(i);
4334+
tracing::debug!("[broadcast] Removed dead peer at index {}", i);
43104335
}
43114336
}
43124337
WorkflowEvent::Status { text, .. } => {
@@ -10626,6 +10651,17 @@ impl ServerHandler for DesktopWrapper {
1062610651
);
1062710652
}
1062810653

10654+
// Add peer to broadcast list for progress notifications
10655+
// All connected clients will receive emit.progress() updates
10656+
{
10657+
let mut broadcast_guard = self.broadcast_peers.lock().await;
10658+
broadcast_guard.push(peer.clone());
10659+
tracing::info!(
10660+
"[on_initialized] Added peer to broadcast list. Total peers: {}",
10661+
broadcast_guard.len()
10662+
);
10663+
}
10664+
1062910665
if supports {
1063010666
tracing::info!("[on_initialized] Storing elicitation-capable peer");
1063110667
let mut guard = self.elicitation_peer.lock().await;

crates/terminator-mcp-agent/src/utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,10 @@ pub struct DesktopWrapper {
431431
/// Used to show UI prompts to the user even when tool calls come from a different peer
432432
#[serde(skip)]
433433
pub elicitation_peer: Arc<TokioMutex<Option<Peer<RoleServer>>>>,
434+
/// All connected peers for broadcasting progress notifications
435+
/// When emit.progress() is called, notifications are sent to ALL connected clients
436+
#[serde(skip)]
437+
pub broadcast_peers: Arc<TokioMutex<Vec<Peer<RoleServer>>>>,
434438
}
435439

436440
impl Default for DesktopWrapper {

0 commit comments

Comments
 (0)