From 77a4afd652fdc3acc982f21485ff09dbaec3e545 Mon Sep 17 00:00:00 2001 From: "ilblackdragon@gmail.com" Date: Sat, 7 Mar 2026 18:11:33 -0800 Subject: [PATCH 1/4] fix(routines): resolve message tool channel/target from per-job metadata When a routine's notify.channel is None, the message tool had no way to resolve channel/target for full-job workers, causing "No target specified" errors. The previous approach mutated shared global state via set_message_tool_context(), which also raced with concurrent jobs. Now the routine's notify config (channel + user) is carried in the job's metadata JSON, and MessageTool::execute falls back to ctx.metadata when neither explicit params nor conversation defaults are available. This eliminates both the None-channel bug and the concurrent-job race. Co-Authored-By: Claude Opus 4.6 --- src/agent/routine_engine.rs | 18 +++--- src/tools/builtin/message.rs | 106 +++++++++++++++++++++++++++-------- 2 files changed, 91 insertions(+), 33 deletions(-) diff --git a/src/agent/routine_engine.rs b/src/agent/routine_engine.rs index da22ffc1..b5b98be0 100644 --- a/src/agent/routine_engine.rs +++ b/src/agent/routine_engine.rs @@ -488,18 +488,16 @@ async fn execute_full_job( reason: "scheduler not available".to_string(), })?; - // Set the message tool's default channel/target from the routine's notify config - // so the LLM can send results without triggering cross-channel approval. - // TODO: This mutates shared global state and can race with concurrent jobs. - // Move notify config into JobContext metadata and apply per-job instead. + // Notify config is carried in job metadata (see above) and read by + // MessageTool::execute from JobContext — no global state mutation needed. + + let mut metadata = serde_json::json!({ "max_iterations": max_iterations }); + // Carry the routine's notify config in job metadata so the message tool + // can resolve channel/target per-job without global state mutation. if let Some(channel) = &routine.notify.channel { - scheduler - .tools() - .set_message_tool_context(Some(channel.clone()), Some(routine.notify.user.clone())) - .await; + metadata["notify_channel"] = serde_json::json!(channel); } - - let metadata = serde_json::json!({ "max_iterations": max_iterations }); + metadata["notify_user"] = serde_json::json!(&routine.notify.user); // Build approval context: UnlessAutoApproved tools are auto-approved for routines; // Always tools require explicit listing in tool_permissions. diff --git a/src/tools/builtin/message.rs b/src/tools/builtin/message.rs index 4259d3dd..67ba06d9 100644 --- a/src/tools/builtin/message.rs +++ b/src/tools/builtin/message.rs @@ -105,42 +105,52 @@ impl Tool for MessageTool { async fn execute( &self, params: serde_json::Value, - _ctx: &JobContext, + ctx: &JobContext, ) -> Result { let start = std::time::Instant::now(); let content = require_str(¶ms, "content")?; - // Get channel: use param or fall back to default + // Get channel: use param → conversation default → job metadata let channel = if let Some(c) = params.get("channel").and_then(|v| v.as_str()) { c.to_string() + } else if let Some(c) = self + .default_channel + .read() + .unwrap_or_else(|e| e.into_inner()) + .clone() + { + c + } else if let Some(c) = ctx + .metadata + .get("notify_channel") + .and_then(|v| v.as_str()) + { + c.to_string() } else { - self.default_channel - .read() - .unwrap_or_else(|e| e.into_inner()) - .clone() - .ok_or_else(|| { - ToolError::ExecutionFailed( - "No channel specified and no active conversation. Provide channel parameter." - .to_string(), - ) - })? + return Err(ToolError::ExecutionFailed( + "No channel specified and no active conversation. Provide channel parameter." + .to_string(), + )); }; - // Get target: use param or fall back to default + // Get target: use param → conversation default → job metadata let target = if let Some(t) = params.get("target").and_then(|v| v.as_str()) { t.to_string() + } else if let Some(t) = self + .default_target + .read() + .unwrap_or_else(|e| e.into_inner()) + .clone() + { + t + } else if let Some(t) = ctx.metadata.get("notify_user").and_then(|v| v.as_str()) { + t.to_string() } else { - self.default_target - .read() - .unwrap_or_else(|e| e.into_inner()) - .clone() - .ok_or_else(|| { - ToolError::ExecutionFailed( - "No target specified and no active conversation. Provide target parameter." - .to_string(), - ) - })? + return Err(ToolError::ExecutionFailed( + "No target specified and no active conversation. Provide target parameter." + .to_string(), + )); }; let attachments: Vec = match params.get("attachments") { @@ -576,4 +586,54 @@ mod tests { ApprovalRequirement::Never, ); } + + #[tokio::test] + async fn message_tool_falls_back_to_job_metadata() { + // Regression: when no conversation context is set (e.g. routine full-job), + // the message tool should fall back to notify_channel/notify_user from + // JobContext metadata instead of returning "No target specified". + let tool = MessageTool::new(Arc::new(ChannelManager::new())); + + let mut ctx = crate::context::JobContext::new("routine-job", "price alert"); + ctx.metadata = serde_json::json!({ + "notify_channel": "telegram", + "notify_user": "123456789", + }); + + // No set_context called — simulates a routine full-job worker + let result = tool + .execute(serde_json::json!({"content": "NEAR price is $5"}), &ctx) + .await; + + // Should fail at channel broadcast (no real channel), NOT at + // "No target specified and no active conversation" + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + !err.contains("No target specified"), + "Should not get 'No target specified' when metadata has notify_user, got: {}", + err + ); + assert!( + !err.contains("No channel specified"), + "Should not get 'No channel specified' when metadata has notify_channel, got: {}", + err + ); + } + + #[tokio::test] + async fn message_tool_no_metadata_still_errors() { + // When neither conversation context nor metadata is set, should still + // return a clear error. + let tool = MessageTool::new(Arc::new(ChannelManager::new())); + let ctx = crate::context::JobContext::new("orphan-job", "no notify config"); + + let result = tool + .execute(serde_json::json!({"content": "hello"}), &ctx) + .await; + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("No channel specified") || err.contains("No target specified")); + } } From 97ffa3f8cd67bdcb2deea39f257dd2fc9f926783 Mon Sep 17 00:00:00 2001 From: "ilblackdragon@gmail.com" Date: Sat, 7 Mar 2026 18:15:58 -0800 Subject: [PATCH 2/4] style: apply cargo fmt Co-Authored-By: Claude Opus 4.6 --- src/tools/builtin/message.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/tools/builtin/message.rs b/src/tools/builtin/message.rs index 67ba06d9..757a31e7 100644 --- a/src/tools/builtin/message.rs +++ b/src/tools/builtin/message.rs @@ -121,11 +121,7 @@ impl Tool for MessageTool { .clone() { c - } else if let Some(c) = ctx - .metadata - .get("notify_channel") - .and_then(|v| v.as_str()) - { + } else if let Some(c) = ctx.metadata.get("notify_channel").and_then(|v| v.as_str()) { c.to_string() } else { return Err(ToolError::ExecutionFailed( From 547d34560d5b44de9e38086d3df9463d4548c18d Mon Sep 17 00:00:00 2001 From: "ilblackdragon@gmail.com" Date: Sat, 7 Mar 2026 19:04:12 -0800 Subject: [PATCH 3/4] fix(message): broadcast to all channels when notify.channel is None MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback: - Fix stale "see above" comment → "populated below" - When notify.channel is None, use broadcast_all instead of erroring with "No channel specified". This matches NotifyConfig semantics where channel=None means "broadcast to all channels" - Channel resolution is now Option: param → default → metadata → None - When None, MessageTool uses ChannelManager::broadcast_all(target, response) and reports which channels succeeded/failed - Add regression test for broadcast-all behavior Co-Authored-By: Claude Opus 4.6 --- src/agent/routine_engine.rs | 2 +- src/tools/builtin/message.rs | 167 ++++++++++++++++++++++++++--------- 2 files changed, 124 insertions(+), 45 deletions(-) diff --git a/src/agent/routine_engine.rs b/src/agent/routine_engine.rs index b5b98be0..272b8e83 100644 --- a/src/agent/routine_engine.rs +++ b/src/agent/routine_engine.rs @@ -488,7 +488,7 @@ async fn execute_full_job( reason: "scheduler not available".to_string(), })?; - // Notify config is carried in job metadata (see above) and read by + // Notify config is carried in job metadata (populated below) and read by // MessageTool::execute from JobContext — no global state mutation needed. let mut metadata = serde_json::json!({ "max_iterations": max_iterations }); diff --git a/src/tools/builtin/message.rs b/src/tools/builtin/message.rs index 757a31e7..a62df360 100644 --- a/src/tools/builtin/message.rs +++ b/src/tools/builtin/message.rs @@ -111,24 +111,23 @@ impl Tool for MessageTool { let content = require_str(¶ms, "content")?; - // Get channel: use param → conversation default → job metadata - let channel = if let Some(c) = params.get("channel").and_then(|v| v.as_str()) { - c.to_string() - } else if let Some(c) = self - .default_channel - .read() - .unwrap_or_else(|e| e.into_inner()) - .clone() - { - c - } else if let Some(c) = ctx.metadata.get("notify_channel").and_then(|v| v.as_str()) { - c.to_string() - } else { - return Err(ToolError::ExecutionFailed( - "No channel specified and no active conversation. Provide channel parameter." - .to_string(), - )); - }; + // Get channel: use param → conversation default → job metadata → None (broadcast all) + let channel: Option = + if let Some(c) = params.get("channel").and_then(|v| v.as_str()) { + Some(c.to_string()) + } else if let Some(c) = self + .default_channel + .read() + .unwrap_or_else(|e| e.into_inner()) + .clone() + { + Some(c) + } else { + ctx.metadata + .get("notify_channel") + .and_then(|v| v.as_str()) + .map(|c| c.to_string()) + }; // Get target: use param → conversation default → job metadata let target = if let Some(t) = params.get("target").and_then(|v| v.as_str()) { @@ -187,36 +186,80 @@ impl Tool for MessageTool { response = response.with_attachments(attachments); } - match self - .channel_manager - .broadcast(&channel, &target, response) - .await - { - Ok(()) => { - tracing::info!( - message_sent = true, - channel = %channel, - target = %target, - attachments = attachment_count, - "Message sent via message tool" - ); - let msg = format!("Sent message to {}:{}", channel, target); - Ok(ToolOutput::text(msg, start.elapsed())) + if let Some(ref channel) = channel { + // Send to a specific channel + match self + .channel_manager + .broadcast(channel, &target, response) + .await + { + Ok(()) => { + tracing::info!( + message_sent = true, + channel = %channel, + target = %target, + attachments = attachment_count, + "Message sent via message tool" + ); + let msg = format!("Sent message to {}:{}", channel, target); + Ok(ToolOutput::text(msg, start.elapsed())) + } + Err(e) => { + let available = self.channel_manager.channel_names().await.join(", "); + let err_msg = if available.is_empty() { + format!( + "Failed to send to {}:{}: {}. No channels connected.", + channel, target, e + ) + } else { + format!( + "Failed to send to {}:{}. Available channels: {}. Error: {}", + channel, target, available, e + ) + }; + Err(ToolError::ExecutionFailed(err_msg)) + } + } + } else { + // No channel specified — broadcast to all channels (routine with notify.channel = None) + let results = self.channel_manager.broadcast_all(&target, response).await; + let mut succeeded = Vec::new(); + let mut failed = Vec::new(); + for (ch, result) in &results { + match result { + Ok(()) => succeeded.push(ch.as_str()), + Err(e) => { + tracing::warn!( + channel = %ch, + target = %target, + "broadcast_all: channel failed: {}", e + ); + failed.push(ch.as_str()); + } + } } - Err(e) => { + if succeeded.is_empty() { let available = self.channel_manager.channel_names().await.join(", "); let err_msg = if available.is_empty() { - format!( - "Failed to send to {}:{}: {}. No channels connected.", - channel, target, e - ) + "No channels connected.".to_string() } else { - format!( - "Failed to send to {}:{}. Available channels: {}. Error: {}", - channel, target, available, e - ) + format!("All channels failed. Available: {}", available) }; Err(ToolError::ExecutionFailed(err_msg)) + } else { + tracing::info!( + message_sent = true, + channels = ?succeeded, + target = %target, + attachments = attachment_count, + "Message broadcast via message tool" + ); + let msg = format!( + "Broadcast message to {} (target: {})", + succeeded.join(", "), + target + ); + Ok(ToolOutput::text(msg, start.elapsed())) } } } @@ -620,7 +663,7 @@ mod tests { #[tokio::test] async fn message_tool_no_metadata_still_errors() { // When neither conversation context nor metadata is set, should still - // return a clear error. + // return a clear error (target resolution fails). let tool = MessageTool::new(Arc::new(ChannelManager::new())); let ctx = crate::context::JobContext::new("orphan-job", "no notify config"); @@ -630,6 +673,42 @@ mod tests { assert!(result.is_err()); let err = result.unwrap_err().to_string(); - assert!(err.contains("No channel specified") || err.contains("No target specified")); + assert!( + err.contains("No target specified"), + "Expected 'No target specified' error, got: {}", + err + ); + } + + #[tokio::test] + async fn message_tool_broadcasts_all_when_no_channel() { + // Regression: when notify.channel is None but notify_user is set, + // the message tool should attempt broadcast_all instead of erroring + // with "No channel specified". + let tool = MessageTool::new(Arc::new(ChannelManager::new())); + + let mut ctx = crate::context::JobContext::new("routine-job", "price alert"); + ctx.metadata = serde_json::json!({ + "notify_user": "123456789", + }); + + let result = tool + .execute(serde_json::json!({"content": "NEAR price is $5"}), &ctx) + .await; + + // Should fail because no channels are registered (empty ChannelManager), + // NOT because "No channel specified". + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + !err.contains("No channel specified"), + "Should not get 'No channel specified' when broadcasting, got: {}", + err + ); + assert!( + err.contains("No channels connected") || err.contains("All channels failed"), + "Expected channel delivery error, got: {}", + err + ); } } From 0487fcb1f3416c1941c70ed0b4cd155011a7256a Mon Sep 17 00:00:00 2001 From: "ilblackdragon@gmail.com" Date: Sat, 7 Mar 2026 19:14:48 -0800 Subject: [PATCH 4/4] fix: use failed channels in error message, remove redundant comment Address review feedback: - Use `failed` vec in error message instead of re-querying channel_names - Remove redundant orphaned comment block in routine_engine.rs Co-Authored-By: Claude Opus 4.6 --- src/agent/routine_engine.rs | 3 --- src/tools/builtin/message.rs | 7 +++---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/agent/routine_engine.rs b/src/agent/routine_engine.rs index 272b8e83..ab4fbeeb 100644 --- a/src/agent/routine_engine.rs +++ b/src/agent/routine_engine.rs @@ -488,9 +488,6 @@ async fn execute_full_job( reason: "scheduler not available".to_string(), })?; - // Notify config is carried in job metadata (populated below) and read by - // MessageTool::execute from JobContext — no global state mutation needed. - let mut metadata = serde_json::json!({ "max_iterations": max_iterations }); // Carry the routine's notify config in job metadata so the message tool // can resolve channel/target per-job without global state mutation. diff --git a/src/tools/builtin/message.rs b/src/tools/builtin/message.rs index a62df360..53d16e78 100644 --- a/src/tools/builtin/message.rs +++ b/src/tools/builtin/message.rs @@ -224,7 +224,7 @@ impl Tool for MessageTool { // No channel specified — broadcast to all channels (routine with notify.channel = None) let results = self.channel_manager.broadcast_all(&target, response).await; let mut succeeded = Vec::new(); - let mut failed = Vec::new(); + let mut failed: Vec<&str> = Vec::new(); for (ch, result) in &results { match result { Ok(()) => succeeded.push(ch.as_str()), @@ -239,11 +239,10 @@ impl Tool for MessageTool { } } if succeeded.is_empty() { - let available = self.channel_manager.channel_names().await.join(", "); - let err_msg = if available.is_empty() { + let err_msg = if failed.is_empty() { "No channels connected.".to_string() } else { - format!("All channels failed. Available: {}", available) + format!("All channels failed: {}", failed.join(", ")) }; Err(ToolError::ExecutionFailed(err_msg)) } else {