Skip to content

Commit 8e1b5d5

Browse files
committed
first pass of swarm start fixes
1 parent d7e7d48 commit 8e1b5d5

File tree

8 files changed

+351
-58
lines changed

8 files changed

+351
-58
lines changed

src/cli/commands/swarm.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::application::{
66
agent_executor::AgentExecutor, resource_monitor::ResourceMonitor,
77
task_coordinator::TaskCoordinator, SwarmOrchestrator,
88
};
9-
use crate::cli::service::{SwarmService, TaskQueueService};
9+
use crate::cli::service::{SwarmService, TaskQueueServiceAdapter};
1010
use crate::infrastructure::claude::{ClaudeClientAdapter, ClaudeClientConfig};
1111
use crate::infrastructure::config::ConfigLoader;
1212
use crate::infrastructure::database::{AgentRepositoryImpl, TaskRepositoryImpl};
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
///
2121
/// Starts the swarm orchestrator with the specified maximum number of agents.
2222
pub async fn handle_start(
23-
_task_service: &TaskQueueService,
23+
_task_service: &TaskQueueServiceAdapter,
2424
max_agents: usize,
2525
json_output: bool,
2626
) -> Result<()> {
@@ -38,12 +38,15 @@ pub async fn handle_start(
3838
"status": "started",
3939
"max_agents": max_agents,
4040
"message": "Swarm orchestrator started successfully",
41-
"database_initialized": db_initialized
41+
"database_initialized": db_initialized,
42+
"log_file": ".abathur/swarm_daemon.log"
4243
});
4344
println!("{}", serde_json::to_string_pretty(&output)?);
4445
} else {
4546
println!("Starting swarm orchestrator with {} max agents...", max_agents);
4647
println!("Swarm orchestrator started successfully");
48+
println!();
49+
println!("Daemon logs are written to: .abathur/swarm_daemon.log");
4750
if !db_initialized {
4851
println!();
4952
println!("Note: Full orchestration requires database setup.");
@@ -56,12 +59,15 @@ pub async fn handle_start(
5659
if json_output {
5760
let output = json!({
5861
"status": "error",
59-
"message": format!("{}", e)
62+
"message": format!("{}", e),
63+
"log_file": ".abathur/swarm_daemon.log"
6064
});
6165
println!("{}", serde_json::to_string_pretty(&output)?);
6266
} else {
6367
println!("Failed to start swarm orchestrator: {}", e);
6468
println!();
69+
println!("Check logs at: .abathur/swarm_daemon.log");
70+
println!();
6571
println!("To enable full swarm functionality:");
6672
println!(" 1. Run 'abathur init' to initialize Abathur");
6773
println!(" 2. Ensure ANTHROPIC_API_KEY environment variable is set");
@@ -75,7 +81,7 @@ pub async fn handle_start(
7581
///
7682
/// Gracefully stops the swarm orchestrator.
7783
pub async fn handle_stop(
78-
_task_service: &TaskQueueService,
84+
_task_service: &TaskQueueServiceAdapter,
7985
json_output: bool,
8086
) -> Result<()> {
8187
let swarm_service = SwarmService::new();
@@ -113,15 +119,15 @@ pub async fn handle_stop(
113119
///
114120
/// Shows the current status of the swarm orchestrator.
115121
pub async fn handle_status(
116-
task_service: &TaskQueueService,
122+
task_service: &TaskQueueServiceAdapter,
117123
json_output: bool,
118124
) -> Result<()> {
119125
let swarm_service = SwarmService::new();
120126

121-
// Get swarm stats
122-
let swarm_stats = swarm_service.get_stats().await?;
127+
// Get swarm stats (includes task stats from database)
128+
let swarm_stats = swarm_service.get_stats(task_service).await?;
123129

124-
// Get queue stats
130+
// Get queue stats for detailed breakdown
125131
let queue_stats = task_service.get_queue_stats().await?;
126132

127133
if json_output {

src/cli/commands/task.rs

Lines changed: 87 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use anyhow::{Context, Result};
2-
use uuid::Uuid;
32

43
use crate::cli::models::TaskStatus;
54
use crate::cli::output::table::{format_queue_stats_table, format_task_table};
@@ -12,15 +11,25 @@ pub async fn handle_submit(
1211
agent_type: String,
1312
summary: Option<String>,
1413
priority: u8,
15-
dependencies: Vec<Uuid>,
14+
dependencies: Vec<String>,
1615
json: bool,
1716
) -> Result<()> {
17+
// Resolve dependency prefixes to full UUIDs
18+
let mut resolved_deps = Vec::new();
19+
for dep_prefix in &dependencies {
20+
let dep_id = service
21+
.resolve_task_id_prefix(dep_prefix)
22+
.await
23+
.context(format!("Failed to resolve dependency '{}'", dep_prefix))?;
24+
resolved_deps.push(dep_id);
25+
}
26+
1827
let task_id = service
1928
.submit_task(
2029
description.clone(),
2130
agent_type.clone(),
2231
priority,
23-
dependencies.clone(),
32+
resolved_deps.clone(),
2433
)
2534
.await
2635
.context("Failed to submit task")?;
@@ -31,7 +40,7 @@ pub async fn handle_submit(
3140
"description": description,
3241
"agent_type": agent_type,
3342
"priority": priority,
34-
"dependencies": dependencies,
43+
"dependencies": resolved_deps,
3544
});
3645
if let Some(summary_text) = &summary {
3746
output["summary"] = serde_json::json!(summary_text);
@@ -83,7 +92,13 @@ pub async fn handle_list(
8392
}
8493

8594
/// Handle task show command
86-
pub async fn handle_show(service: &TaskQueueServiceAdapter, task_id: Uuid, json: bool) -> Result<()> {
95+
pub async fn handle_show(service: &TaskQueueServiceAdapter, task_id_prefix: String, json: bool) -> Result<()> {
96+
// Resolve task ID prefix
97+
let task_id = service
98+
.resolve_task_id_prefix(&task_id_prefix)
99+
.await
100+
.context(format!("Failed to resolve task ID '{}'", task_id_prefix))?;
101+
87102
let task = service
88103
.get_task(task_id)
89104
.await
@@ -144,12 +159,12 @@ pub async fn handle_show(service: &TaskQueueServiceAdapter, task_id: Uuid, json:
144159
/// Handle task update command
145160
pub async fn handle_update(
146161
service: &TaskQueueServiceAdapter,
147-
task_ids: Vec<Uuid>,
162+
task_id_prefixes: Vec<String>,
148163
status: Option<String>,
149164
priority: Option<u8>,
150165
agent_type: Option<String>,
151-
add_dependency: Vec<Uuid>,
152-
remove_dependency: Vec<Uuid>,
166+
add_dependency: Vec<String>,
167+
remove_dependency: Vec<String>,
153168
retry: bool,
154169
cancel: bool,
155170
json: bool,
@@ -168,6 +183,35 @@ pub async fn handle_update(
168183
));
169184
}
170185

186+
// Resolve all task ID prefixes
187+
let mut task_ids = Vec::new();
188+
for prefix in &task_id_prefixes {
189+
let task_id = service
190+
.resolve_task_id_prefix(prefix)
191+
.await
192+
.context(format!("Failed to resolve task ID '{}'", prefix))?;
193+
task_ids.push(task_id);
194+
}
195+
196+
// Resolve dependency prefixes
197+
let mut resolved_add_deps = Vec::new();
198+
for dep_prefix in &add_dependency {
199+
let dep_id = service
200+
.resolve_task_id_prefix(dep_prefix)
201+
.await
202+
.context(format!("Failed to resolve add-dependency '{}'", dep_prefix))?;
203+
resolved_add_deps.push(dep_id);
204+
}
205+
206+
let mut resolved_remove_deps = Vec::new();
207+
for dep_prefix in &remove_dependency {
208+
let dep_id = service
209+
.resolve_task_id_prefix(dep_prefix)
210+
.await
211+
.context(format!("Failed to resolve remove-dependency '{}'", dep_prefix))?;
212+
resolved_remove_deps.push(dep_id);
213+
}
214+
171215
let mut results = Vec::new();
172216
let mut errors = Vec::new();
173217

@@ -179,8 +223,8 @@ pub async fn handle_update(
179223
status.as_deref(),
180224
priority,
181225
agent_type.clone(),
182-
add_dependency.clone(),
183-
remove_dependency.clone(),
226+
resolved_add_deps.clone(),
227+
resolved_remove_deps.clone(),
184228
retry,
185229
cancel,
186230
)
@@ -238,3 +282,36 @@ pub async fn handle_status(service: &TaskQueueServiceAdapter, json: bool) -> Res
238282

239283
Ok(())
240284
}
285+
286+
/// Handle task resolve command
287+
///
288+
/// Resolves dependencies for all Pending/Blocked tasks and updates them to Ready
289+
/// if their dependencies are satisfied.
290+
pub async fn handle_resolve(service: &TaskQueueServiceAdapter, json: bool) -> Result<()> {
291+
let count = service
292+
.resolve_dependencies()
293+
.await
294+
.context("Failed to resolve task dependencies")?;
295+
296+
if json {
297+
let output = serde_json::json!({
298+
"status": "success",
299+
"tasks_updated": count,
300+
"message": format!("{} task(s) updated to Ready status", count)
301+
});
302+
println!("{}", serde_json::to_string_pretty(&output)?);
303+
} else {
304+
println!("Task Dependency Resolution");
305+
println!("=========================");
306+
println!("Tasks updated to Ready: {}", count);
307+
308+
if count > 0 {
309+
println!("\nRun 'abathur task list --status ready' to view ready tasks.");
310+
} else {
311+
println!("\nNo tasks were ready to be updated.");
312+
println!("Check 'abathur task list --status pending' or '--status blocked' for pending tasks.");
313+
}
314+
}
315+
316+
Ok(())
317+
}

src/cli/service/swarm_service.rs

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@ use std::fs;
1010
use std::path::PathBuf;
1111
use std::process::{Command, Stdio};
1212

13-
/// Persisted swarm state
13+
/// Persisted swarm state - only tracks process information
14+
/// Task statistics are derived from the database, not stored here
1415
#[derive(Debug, Clone, Serialize, Deserialize)]
1516
struct SwarmStateFile {
1617
state: String,
1718
max_agents: usize,
18-
tasks_processed: u64,
19-
tasks_failed: u64,
2019
pid: Option<u32>,
2120
}
2221

@@ -25,8 +24,6 @@ impl Default for SwarmStateFile {
2524
Self {
2625
state: "Stopped".to_string(),
2726
max_agents: 0,
28-
tasks_processed: 0,
29-
tasks_failed: 0,
3027
pid: None,
3128
}
3229
}
@@ -129,32 +126,50 @@ impl SwarmService {
129126
let cwd = std::env::current_dir()
130127
.context("Failed to get current directory")?;
131128

132-
// Spawn background process with hidden --__daemon flag
129+
// Create log file for daemon stderr/stdout
130+
let log_path = cwd.join(".abathur/swarm_daemon.log");
131+
let log_file = std::fs::OpenOptions::new()
132+
.create(true)
133+
.append(true)
134+
.open(&log_path)
135+
.context("Failed to create daemon log file")?;
136+
137+
// Write log header
138+
use std::io::Write;
139+
let mut header_file = std::fs::OpenOptions::new()
140+
.append(true)
141+
.open(&log_path)?;
142+
writeln!(header_file, "\n=== Daemon starting at {} ===",
143+
chrono::Local::now().format("%Y-%m-%d %H:%M:%S"))?;
144+
145+
// Spawn background process with hidden --daemon flag
133146
// This flag will be handled in main.rs to run the orchestrator loop
134147
#[cfg(unix)]
135148
let child = Command::new(&exe_path)
136149
.arg("swarm")
137-
.arg("--__daemon")
150+
.arg("start")
151+
.arg("--daemon")
138152
.arg("--max-agents")
139153
.arg(max_agents.to_string())
140154
.current_dir(&cwd)
141155
.stdin(Stdio::null())
142-
.stdout(Stdio::null())
143-
.stderr(Stdio::null())
156+
.stdout(Stdio::from(log_file.try_clone()?))
157+
.stderr(Stdio::from(log_file))
144158
.spawn()
145159
.context("Failed to spawn background swarm process")?;
146160

147161
#[cfg(windows)]
148162
let child = Command::new(&exe_path)
149163
.arg("swarm")
150-
.arg("--__daemon")
164+
.arg("start")
165+
.arg("--daemon")
151166
.arg("--max-agents")
152167
.arg(max_agents.to_string())
153168
.current_dir(&cwd)
154169
.creation_flags(0x08000000) // CREATE_NO_WINDOW
155170
.stdin(Stdio::null())
156-
.stdout(Stdio::null())
157-
.stderr(Stdio::null())
171+
.stdout(Stdio::from(log_file.try_clone()?))
172+
.stderr(Stdio::from(log_file))
158173
.spawn()
159174
.context("Failed to spawn background swarm process")?;
160175

@@ -219,7 +234,12 @@ impl SwarmService {
219234
}
220235

221236
/// Get swarm statistics
222-
pub async fn get_stats(&self) -> Result<SwarmStats> {
237+
///
238+
/// Combines process state from state file with task statistics from database
239+
pub async fn get_stats(
240+
&self,
241+
task_service: &crate::cli::service::TaskQueueServiceAdapter,
242+
) -> Result<SwarmStats> {
223243
let mut state = Self::read_state()?;
224244

225245
// Check if the recorded PID is actually alive
@@ -252,13 +272,24 @@ impl SwarmService {
252272
_ => SwarmState::Stopped,
253273
};
254274

275+
// Get task statistics from database (source of truth)
276+
let queue_stats = task_service.get_queue_stats().await?;
277+
278+
// Calculate agent statistics
279+
let active_agents = queue_stats.running;
280+
let idle_agents = if swarm_state == SwarmState::Running {
281+
state.max_agents.saturating_sub(active_agents)
282+
} else {
283+
0
284+
};
285+
255286
Ok(SwarmStats {
256287
state: swarm_state,
257288
max_agents: state.max_agents,
258-
active_agents: 0,
259-
idle_agents: 0,
260-
tasks_processed: state.tasks_processed,
261-
tasks_failed: state.tasks_failed,
289+
active_agents,
290+
idle_agents,
291+
tasks_processed: queue_stats.completed as u64,
292+
tasks_failed: queue_stats.failed as u64,
262293
})
263294
}
264295

0 commit comments

Comments
 (0)