Skip to content

Commit c61c6ee

Browse files
feat(enterprise): implement CRDB task management commands (#169) (#291)
- Add List, Get, Status, Progress, and Logs commands for CRDB tasks - Add filtering by status, type, and CRDB UID - Add list-by-crdb command to view tasks for specific CRDB - Add task control operations: Cancel, Retry, Pause, Resume - Support graceful handling of operations that may not be available - Add comprehensive mdBook documentation with examples - Successfully tested list command against Docker environment (returns empty tasks array as expected)
1 parent 9b0c33c commit c61c6ee

File tree

6 files changed

+762
-0
lines changed

6 files changed

+762
-0
lines changed

crates/redisctl/src/cli.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,9 @@ pub enum EnterpriseCommands {
10041004
/// Active-Active database (CRDB) operations
10051005
#[command(subcommand)]
10061006
Crdb(EnterpriseCrdbCommands),
1007+
/// CRDB task operations
1008+
#[command(subcommand, name = "crdb-task")]
1009+
CrdbTask(crate::commands::enterprise::crdb_task::CrdbTaskCommands),
10071010

10081011
/// Job scheduler operations
10091012
#[command(subcommand, name = "job-scheduler")]
Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
use anyhow::Context;
2+
use clap::Subcommand;
3+
4+
use crate::cli::OutputFormat;
5+
use crate::connection::ConnectionManager;
6+
use crate::error::Result as CliResult;
7+
8+
#[derive(Debug, Clone, Subcommand)]
9+
pub enum CrdbTaskCommands {
10+
/// List all CRDB tasks
11+
List {
12+
/// Filter by task status
13+
#[arg(long)]
14+
status: Option<String>,
15+
16+
/// Filter by task type
17+
#[arg(long, name = "type")]
18+
task_type: Option<String>,
19+
20+
/// Filter by CRDB UID
21+
#[arg(long)]
22+
crdb_uid: Option<u32>,
23+
},
24+
25+
/// Get CRDB task details
26+
Get {
27+
/// Task ID
28+
task_id: String,
29+
},
30+
31+
/// Get task status
32+
Status {
33+
/// Task ID
34+
task_id: String,
35+
},
36+
37+
/// Get task progress information
38+
Progress {
39+
/// Task ID
40+
task_id: String,
41+
},
42+
43+
/// Get task logs
44+
Logs {
45+
/// Task ID
46+
task_id: String,
47+
},
48+
49+
/// List tasks for a specific CRDB
50+
#[command(name = "list-by-crdb")]
51+
ListByCrdb {
52+
/// CRDB UID
53+
crdb_uid: u32,
54+
55+
/// Filter by task status
56+
#[arg(long)]
57+
status: Option<String>,
58+
59+
/// Filter by task type
60+
#[arg(long, name = "type")]
61+
task_type: Option<String>,
62+
},
63+
64+
/// Cancel a running task
65+
Cancel {
66+
/// Task ID
67+
task_id: String,
68+
69+
/// Force cancellation without confirmation
70+
#[arg(short, long)]
71+
force: bool,
72+
},
73+
74+
/// Retry a failed task
75+
Retry {
76+
/// Task ID
77+
task_id: String,
78+
},
79+
80+
/// Pause a running task
81+
Pause {
82+
/// Task ID
83+
task_id: String,
84+
},
85+
86+
/// Resume a paused task
87+
Resume {
88+
/// Task ID
89+
task_id: String,
90+
},
91+
}
92+
93+
impl CrdbTaskCommands {
94+
#[allow(dead_code)]
95+
pub async fn execute(
96+
&self,
97+
conn_mgr: &ConnectionManager,
98+
profile_name: Option<&str>,
99+
output_format: OutputFormat,
100+
query: Option<&str>,
101+
) -> CliResult<()> {
102+
let client = conn_mgr.create_enterprise_client(profile_name).await?;
103+
104+
match self {
105+
CrdbTaskCommands::List {
106+
status,
107+
task_type,
108+
crdb_uid,
109+
} => {
110+
// Get all CRDB tasks
111+
let mut response: serde_json::Value = client
112+
.get("/crdb_tasks")
113+
.await
114+
.context("Failed to list CRDB tasks")?;
115+
116+
// Apply filters if provided
117+
if (status.is_some() || task_type.is_some() || crdb_uid.is_some())
118+
&& let Some(tasks) = response["tasks"].as_array_mut()
119+
{
120+
tasks.retain(|task| {
121+
let mut keep = true;
122+
123+
if let Some(s) = status {
124+
keep = keep && task["status"].as_str() == Some(s);
125+
}
126+
127+
if let Some(t) = task_type {
128+
keep = keep && task["type"].as_str() == Some(t);
129+
}
130+
131+
if let Some(uid) = crdb_uid {
132+
keep = keep
133+
&& task["crdb_guid"]
134+
.as_str()
135+
.and_then(|guid| guid.split('-').nth(0))
136+
.and_then(|id| id.parse::<u32>().ok())
137+
== Some(*uid);
138+
}
139+
140+
keep
141+
});
142+
}
143+
144+
let output_data = if let Some(q) = query {
145+
super::utils::apply_jmespath(&response, q)?
146+
} else {
147+
response
148+
};
149+
super::utils::print_formatted_output(output_data, output_format)?;
150+
}
151+
152+
CrdbTaskCommands::Get { task_id } => {
153+
let response: serde_json::Value = client
154+
.get(&format!("/crdb_tasks/{}", task_id))
155+
.await
156+
.context("Failed to get CRDB task")?;
157+
158+
let output_data = if let Some(q) = query {
159+
super::utils::apply_jmespath(&response, q)?
160+
} else {
161+
response
162+
};
163+
super::utils::print_formatted_output(output_data, output_format)?;
164+
}
165+
166+
CrdbTaskCommands::Status { task_id } => {
167+
let response: serde_json::Value = client
168+
.get(&format!("/crdb_tasks/{}", task_id))
169+
.await
170+
.context("Failed to get task status")?;
171+
172+
// Extract just the status field
173+
let status = &response["status"];
174+
175+
let output_data = if let Some(q) = query {
176+
super::utils::apply_jmespath(status, q)?
177+
} else {
178+
status.clone()
179+
};
180+
super::utils::print_formatted_output(output_data, output_format)?;
181+
}
182+
183+
CrdbTaskCommands::Progress { task_id } => {
184+
let response: serde_json::Value = client
185+
.get(&format!("/crdb_tasks/{}", task_id))
186+
.await
187+
.context("Failed to get task progress")?;
188+
189+
// Extract progress information
190+
let progress = serde_json::json!({
191+
"status": response["status"],
192+
"progress": response["progress"],
193+
"progress_percent": response["progress_percent"],
194+
"start_time": response["start_time"],
195+
"end_time": response["end_time"],
196+
});
197+
198+
let output_data = if let Some(q) = query {
199+
super::utils::apply_jmespath(&progress, q)?
200+
} else {
201+
progress
202+
};
203+
super::utils::print_formatted_output(output_data, output_format)?;
204+
}
205+
206+
CrdbTaskCommands::Logs { task_id } => {
207+
let response: serde_json::Value = client
208+
.get(&format!("/crdb_tasks/{}", task_id))
209+
.await
210+
.context("Failed to get task logs")?;
211+
212+
// Extract logs if available
213+
let logs = if response["logs"].is_null() {
214+
serde_json::json!({
215+
"task_id": task_id,
216+
"logs": "No logs available",
217+
"status": response["status"]
218+
})
219+
} else {
220+
response["logs"].clone()
221+
};
222+
223+
let output_data = if let Some(q) = query {
224+
super::utils::apply_jmespath(&logs, q)?
225+
} else {
226+
logs
227+
};
228+
super::utils::print_formatted_output(output_data, output_format)?;
229+
}
230+
231+
CrdbTaskCommands::ListByCrdb {
232+
crdb_uid,
233+
status,
234+
task_type,
235+
} => {
236+
// Get all CRDB tasks
237+
let mut response: serde_json::Value = client
238+
.get("/crdb_tasks")
239+
.await
240+
.context("Failed to list CRDB tasks")?;
241+
242+
// Filter by CRDB UID and optional status/type
243+
if let Some(tasks) = response["tasks"].as_array_mut() {
244+
tasks.retain(|task| {
245+
let mut keep = task["crdb_guid"]
246+
.as_str()
247+
.and_then(|guid| guid.split('-').nth(0))
248+
.and_then(|id| id.parse::<u32>().ok())
249+
== Some(*crdb_uid);
250+
251+
if let Some(s) = status {
252+
keep = keep && task["status"].as_str() == Some(s);
253+
}
254+
255+
if let Some(t) = task_type {
256+
keep = keep && task["type"].as_str() == Some(t);
257+
}
258+
259+
keep
260+
});
261+
}
262+
263+
let output_data = if let Some(q) = query {
264+
super::utils::apply_jmespath(&response, q)?
265+
} else {
266+
response
267+
};
268+
super::utils::print_formatted_output(output_data, output_format)?;
269+
}
270+
271+
CrdbTaskCommands::Cancel { task_id, force } => {
272+
if !force && !super::utils::confirm_action(&format!("Cancel task {}?", task_id))? {
273+
return Ok(());
274+
}
275+
276+
// Cancel the task
277+
let _: serde_json::Value = client
278+
.post(
279+
&format!("/crdb_tasks/{}/actions/cancel", task_id),
280+
&serde_json::json!({}),
281+
)
282+
.await
283+
.context("Failed to cancel task")?;
284+
285+
println!("Task {} cancelled successfully", task_id);
286+
}
287+
288+
CrdbTaskCommands::Retry { task_id } => {
289+
// Note: Retry endpoint may not exist in all versions
290+
// Try to retry the task
291+
let result: Result<serde_json::Value, _> = client
292+
.post(
293+
&format!("/crdb_tasks/{}/actions/retry", task_id),
294+
&serde_json::json!({}),
295+
)
296+
.await;
297+
298+
match result {
299+
Ok(_) => println!("Task {} retry initiated", task_id),
300+
Err(_) => {
301+
// If retry endpoint doesn't exist, provide alternative
302+
println!("Retry operation not available for task {}", task_id);
303+
println!("Consider creating a new task with the same configuration");
304+
}
305+
}
306+
}
307+
308+
CrdbTaskCommands::Pause { task_id } => {
309+
// Note: Pause endpoint may not exist in all versions
310+
let result: Result<serde_json::Value, _> = client
311+
.post(
312+
&format!("/crdb_tasks/{}/actions/pause", task_id),
313+
&serde_json::json!({}),
314+
)
315+
.await;
316+
317+
match result {
318+
Ok(_) => println!("Task {} paused", task_id),
319+
Err(_) => {
320+
println!("Pause operation not available for task {}", task_id);
321+
println!("Task pause may not be supported for this task type");
322+
}
323+
}
324+
}
325+
326+
CrdbTaskCommands::Resume { task_id } => {
327+
// Note: Resume endpoint may not exist in all versions
328+
let result: Result<serde_json::Value, _> = client
329+
.post(
330+
&format!("/crdb_tasks/{}/actions/resume", task_id),
331+
&serde_json::json!({}),
332+
)
333+
.await;
334+
335+
match result {
336+
Ok(_) => println!("Task {} resumed", task_id),
337+
Err(_) => {
338+
println!("Resume operation not available for task {}", task_id);
339+
println!("Task resume may not be supported for this task type");
340+
}
341+
}
342+
}
343+
}
344+
345+
Ok(())
346+
}
347+
}
348+
349+
#[allow(dead_code)]
350+
pub async fn handle_crdb_task_command(
351+
conn_mgr: &ConnectionManager,
352+
profile_name: Option<&str>,
353+
crdb_task_cmd: CrdbTaskCommands,
354+
output_format: OutputFormat,
355+
query: Option<&str>,
356+
) -> CliResult<()> {
357+
crdb_task_cmd
358+
.execute(conn_mgr, profile_name, output_format, query)
359+
.await
360+
}

crates/redisctl/src/commands/enterprise/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod cluster;
66
pub mod cluster_impl;
77
pub mod crdb;
88
pub mod crdb_impl;
9+
pub mod crdb_task;
910
pub mod database;
1011
pub mod database_impl;
1112
pub mod diagnostics;

crates/redisctl/src/main.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,16 @@ async fn execute_enterprise_command(
274274
)
275275
.await
276276
}
277+
CrdbTask(crdb_task_cmd) => {
278+
commands::enterprise::crdb_task::handle_crdb_task_command(
279+
conn_mgr,
280+
profile,
281+
crdb_task_cmd.clone(),
282+
output,
283+
query,
284+
)
285+
.await
286+
}
277287
JobScheduler(job_scheduler_cmd) => {
278288
commands::enterprise::job_scheduler::handle_job_scheduler_command(
279289
conn_mgr,

0 commit comments

Comments
 (0)