diff --git a/src/agent/routine_engine.rs b/src/agent/routine_engine.rs index da22ffc1..ab4fbeeb 100644 --- a/src/agent/routine_engine.rs +++ b/src/agent/routine_engine.rs @@ -488,18 +488,13 @@ async fn execute_full_job( reason: "scheduler not available".to_string(), })?; - // Set the message tool's default channel/target from the routine's notify config - // so the LLM can send results without triggering cross-channel approval. - // TODO: This mutates shared global state and can race with concurrent jobs. - // Move notify config into JobContext metadata and apply per-job instead. + let mut metadata = serde_json::json!({ "max_iterations": max_iterations }); + // Carry the routine's notify config in job metadata so the message tool + // can resolve channel/target per-job without global state mutation. if let Some(channel) = &routine.notify.channel { - scheduler - .tools() - .set_message_tool_context(Some(channel.clone()), Some(routine.notify.user.clone())) - .await; + metadata["notify_channel"] = serde_json::json!(channel); } - - let metadata = serde_json::json!({ "max_iterations": max_iterations }); + metadata["notify_user"] = serde_json::json!(&routine.notify.user); // Build approval context: UnlessAutoApproved tools are auto-approved for routines; // Always tools require explicit listing in tool_permissions. diff --git a/src/tools/builtin/message.rs b/src/tools/builtin/message.rs index 4259d3dd..53d16e78 100644 --- a/src/tools/builtin/message.rs +++ b/src/tools/builtin/message.rs @@ -105,42 +105,47 @@ impl Tool for MessageTool { async fn execute( &self, params: serde_json::Value, - _ctx: &JobContext, + ctx: &JobContext, ) -> Result { let start = std::time::Instant::now(); let content = require_str(¶ms, "content")?; - // Get channel: use param or fall back to default - let channel = if let Some(c) = params.get("channel").and_then(|v| v.as_str()) { - c.to_string() - } else { - self.default_channel + // Get channel: use param → conversation default → job metadata → None (broadcast all) + let channel: Option = + if let Some(c) = params.get("channel").and_then(|v| v.as_str()) { + Some(c.to_string()) + } else if let Some(c) = self + .default_channel .read() .unwrap_or_else(|e| e.into_inner()) .clone() - .ok_or_else(|| { - ToolError::ExecutionFailed( - "No channel specified and no active conversation. Provide channel parameter." - .to_string(), - ) - })? - }; - - // Get target: use param or fall back to default + { + Some(c) + } else { + ctx.metadata + .get("notify_channel") + .and_then(|v| v.as_str()) + .map(|c| c.to_string()) + }; + + // Get target: use param → conversation default → job metadata let target = if let Some(t) = params.get("target").and_then(|v| v.as_str()) { t.to_string() + } else if let Some(t) = self + .default_target + .read() + .unwrap_or_else(|e| e.into_inner()) + .clone() + { + t + } else if let Some(t) = ctx.metadata.get("notify_user").and_then(|v| v.as_str()) { + t.to_string() } else { - self.default_target - .read() - .unwrap_or_else(|e| e.into_inner()) - .clone() - .ok_or_else(|| { - ToolError::ExecutionFailed( - "No target specified and no active conversation. Provide target parameter." - .to_string(), - ) - })? + return Err(ToolError::ExecutionFailed( + "No target specified and no active conversation. Provide target parameter." + .to_string(), + )); }; let attachments: Vec = match params.get("attachments") { @@ -181,37 +186,80 @@ impl Tool for MessageTool { response = response.with_attachments(attachments); } - match self - .channel_manager - .broadcast(&channel, &target, response) - .await - { - Ok(()) => { + if let Some(ref channel) = channel { + // Send to a specific channel + match self + .channel_manager + .broadcast(channel, &target, response) + .await + { + Ok(()) => { + tracing::info!( + message_sent = true, + channel = %channel, + target = %target, + attachments = attachment_count, + "Message sent via message tool" + ); + let msg = format!("Sent message to {}:{}", channel, target); + Ok(ToolOutput::text(msg, start.elapsed())) + } + Err(e) => { + let available = self.channel_manager.channel_names().await.join(", "); + let err_msg = if available.is_empty() { + format!( + "Failed to send to {}:{}: {}. No channels connected.", + channel, target, e + ) + } else { + format!( + "Failed to send to {}:{}. Available channels: {}. Error: {}", + channel, target, available, e + ) + }; + Err(ToolError::ExecutionFailed(err_msg)) + } + } + } else { + // No channel specified — broadcast to all channels (routine with notify.channel = None) + let results = self.channel_manager.broadcast_all(&target, response).await; + let mut succeeded = Vec::new(); + let mut failed: Vec<&str> = Vec::new(); + for (ch, result) in &results { + match result { + Ok(()) => succeeded.push(ch.as_str()), + Err(e) => { + tracing::warn!( + channel = %ch, + target = %target, + "broadcast_all: channel failed: {}", e + ); + failed.push(ch.as_str()); + } + } + } + if succeeded.is_empty() { + let err_msg = if failed.is_empty() { + "No channels connected.".to_string() + } else { + format!("All channels failed: {}", failed.join(", ")) + }; + Err(ToolError::ExecutionFailed(err_msg)) + } else { tracing::info!( message_sent = true, - channel = %channel, + channels = ?succeeded, target = %target, attachments = attachment_count, - "Message sent via message tool" + "Message broadcast via message tool" + ); + let msg = format!( + "Broadcast message to {} (target: {})", + succeeded.join(", "), + target ); - let msg = format!("Sent message to {}:{}", channel, target); Ok(ToolOutput::text(msg, start.elapsed())) } - Err(e) => { - let available = self.channel_manager.channel_names().await.join(", "); - let err_msg = if available.is_empty() { - format!( - "Failed to send to {}:{}: {}. No channels connected.", - channel, target, e - ) - } else { - format!( - "Failed to send to {}:{}. Available channels: {}. Error: {}", - channel, target, available, e - ) - }; - Err(ToolError::ExecutionFailed(err_msg)) - } } } @@ -576,4 +624,90 @@ mod tests { ApprovalRequirement::Never, ); } + + #[tokio::test] + async fn message_tool_falls_back_to_job_metadata() { + // Regression: when no conversation context is set (e.g. routine full-job), + // the message tool should fall back to notify_channel/notify_user from + // JobContext metadata instead of returning "No target specified". + let tool = MessageTool::new(Arc::new(ChannelManager::new())); + + let mut ctx = crate::context::JobContext::new("routine-job", "price alert"); + ctx.metadata = serde_json::json!({ + "notify_channel": "telegram", + "notify_user": "123456789", + }); + + // No set_context called — simulates a routine full-job worker + let result = tool + .execute(serde_json::json!({"content": "NEAR price is $5"}), &ctx) + .await; + + // Should fail at channel broadcast (no real channel), NOT at + // "No target specified and no active conversation" + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + !err.contains("No target specified"), + "Should not get 'No target specified' when metadata has notify_user, got: {}", + err + ); + assert!( + !err.contains("No channel specified"), + "Should not get 'No channel specified' when metadata has notify_channel, got: {}", + err + ); + } + + #[tokio::test] + async fn message_tool_no_metadata_still_errors() { + // When neither conversation context nor metadata is set, should still + // return a clear error (target resolution fails). + let tool = MessageTool::new(Arc::new(ChannelManager::new())); + let ctx = crate::context::JobContext::new("orphan-job", "no notify config"); + + let result = tool + .execute(serde_json::json!({"content": "hello"}), &ctx) + .await; + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("No target specified"), + "Expected 'No target specified' error, got: {}", + err + ); + } + + #[tokio::test] + async fn message_tool_broadcasts_all_when_no_channel() { + // Regression: when notify.channel is None but notify_user is set, + // the message tool should attempt broadcast_all instead of erroring + // with "No channel specified". + let tool = MessageTool::new(Arc::new(ChannelManager::new())); + + let mut ctx = crate::context::JobContext::new("routine-job", "price alert"); + ctx.metadata = serde_json::json!({ + "notify_user": "123456789", + }); + + let result = tool + .execute(serde_json::json!({"content": "NEAR price is $5"}), &ctx) + .await; + + // Should fail because no channels are registered (empty ChannelManager), + // NOT because "No channel specified". + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + !err.contains("No channel specified"), + "Should not get 'No channel specified' when broadcasting, got: {}", + err + ); + assert!( + err.contains("No channels connected") || err.contains("All channels failed"), + "Expected channel delivery error, got: {}", + err + ); + } }