Skip to content

Commit be4e063

Browse files
feat(enterprise): add database watch command for real-time status monitoring (#458)
Implements Phase 2 of issue #405 - Database Lifecycle Event Monitoring ## Library Changes (redis-enterprise) - Add watch_database() method to DatabaseHandler/BdbHandler - Polls database endpoint and tracks status changes - Returns stream of (DatabaseInfo, Option<String>) tuples - Second element is Some(old_status) when status changes occur ## CLI Changes (redisctl) - New command: redisctl enterprise database watch <id> - Displays timestamped status updates with key metrics - Highlights status transitions prominently - Supports JMESPath filtering via -q flag - Ctrl+C for graceful shutdown - Configurable poll interval (default: 5 seconds) ## Use Cases - Monitor database during upgrades/migrations - Watch backup/restore operations - Track scaling operations - Detect and alert on status changes ## Example Usage ```bash # Watch database with default 5-second polling redisctl enterprise database watch 1 # Faster polling for critical operations redisctl enterprise database watch 1 --poll-interval 2 # Filter output with JMESPath redisctl enterprise database watch 1 -q '{name: name, status: status}' ``` ## Example Output ``` Watching database 1 (Ctrl+C to stop)... [10:30:00] Database 1: active | mem: 2.10GB | shards: 1 [10:30:15] Database 1: active -> updating (TRANSITION) Memory: 2.10GB / 5.00GB (42.0%) Shards: 1 [10:32:30] Database 1: updating -> active (TRANSITION) Memory: 2.15GB / 5.00GB (43.0%) Shards: 1 ``` Addresses #405 (Phase 2)
1 parent 81fba53 commit be4e063

File tree

4 files changed

+200
-0
lines changed

4 files changed

+200
-0
lines changed

crates/redis-enterprise/src/bdb.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,19 @@
7474
7575
use crate::client::RestClient;
7676
use crate::error::Result;
77+
use futures::stream::Stream;
7778
use serde::{Deserialize, Serialize};
7879
use serde_json::Value;
80+
use std::pin::Pin;
81+
use std::time::Duration;
82+
use tokio::time::sleep;
7983
use typed_builder::TypedBuilder;
8084

8185
// Aliases for easier use
8286
pub type Database = DatabaseInfo;
8387
pub type BdbHandler = DatabaseHandler;
88+
pub type DatabaseWatchStream<'a> =
89+
Pin<Box<dyn Stream<Item = Result<(DatabaseInfo, Option<String>)>> + Send + 'a>>;
8490

8591
/// Response from database action operations
8692
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -981,4 +987,84 @@ impl DatabaseHandler {
981987
pub async fn create_v2(&self, request: Value) -> Result<DatabaseInfo> {
982988
self.client.post("/v2/bdbs", &request).await
983989
}
990+
991+
/// Watch database status changes in real-time
992+
///
993+
/// Polls the database endpoint and yields updates when status changes occur.
994+
/// Useful for monitoring database operations like upgrades, migrations, backups, etc.
995+
///
996+
/// # Arguments
997+
/// * `uid` - Database ID to watch
998+
/// * `poll_interval` - Time to wait between polls
999+
///
1000+
/// # Returns
1001+
/// A stream of `(DatabaseInfo, Option<String>)` tuples where:
1002+
/// - `DatabaseInfo` - Current database state
1003+
/// - `Option<String>` - Previous status (None on first poll, Some on status change)
1004+
///
1005+
/// # Example
1006+
/// ```no_run
1007+
/// use redis_enterprise::{EnterpriseClient, BdbHandler as DatabaseHandler};
1008+
/// use futures::StreamExt;
1009+
/// use std::time::Duration;
1010+
///
1011+
/// # async fn example(client: EnterpriseClient) -> Result<(), Box<dyn std::error::Error>> {
1012+
/// let handler = DatabaseHandler::new(client);
1013+
/// let mut stream = handler.watch_database(1, Duration::from_secs(5));
1014+
///
1015+
/// while let Some(result) = stream.next().await {
1016+
/// match result {
1017+
/// Ok((db_info, prev_status)) => {
1018+
/// if let Some(old_status) = prev_status {
1019+
/// println!("Status changed: {} -> {}", old_status, db_info.status.unwrap_or_default());
1020+
/// } else {
1021+
/// println!("Initial status: {}", db_info.status.unwrap_or_default());
1022+
/// }
1023+
/// }
1024+
/// Err(e) => eprintln!("Error: {}", e),
1025+
/// }
1026+
/// }
1027+
/// # Ok(())
1028+
/// # }
1029+
/// ```
1030+
pub fn watch_database(&self, uid: u32, poll_interval: Duration) -> DatabaseWatchStream<'_> {
1031+
Box::pin(async_stream::stream! {
1032+
let mut last_status: Option<String> = None;
1033+
1034+
loop {
1035+
match self.info(uid).await {
1036+
Ok(db_info) => {
1037+
let current_status = db_info.status.clone();
1038+
1039+
// Check if status changed
1040+
let status_changed = match (&last_status, &current_status) {
1041+
(Some(old), Some(new)) => old != new,
1042+
(None, Some(_)) => false, // First poll, not a change
1043+
(Some(_), None) => true, // Status disappeared
1044+
(None, None) => false,
1045+
};
1046+
1047+
// Yield the database info with previous status if changed
1048+
if status_changed {
1049+
yield Ok((db_info, last_status.clone()));
1050+
} else if last_status.is_none() {
1051+
// First poll - always yield
1052+
yield Ok((db_info, None));
1053+
} else {
1054+
// Status unchanged - yield current state for monitoring
1055+
yield Ok((db_info, None));
1056+
}
1057+
1058+
last_status = current_status;
1059+
}
1060+
Err(e) => {
1061+
yield Err(e);
1062+
break;
1063+
}
1064+
}
1065+
1066+
sleep(poll_interval).await;
1067+
}
1068+
})
1069+
}
9841070
}

crates/redisctl/src/cli/enterprise.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,15 @@ NOTE: Memory size is in bytes. Common values:
451451
force: bool,
452452
},
453453

454+
/// Watch database status changes in real-time
455+
Watch {
456+
/// Database ID
457+
id: u32,
458+
/// Poll interval in seconds
459+
#[arg(long, default_value = "5")]
460+
poll_interval: u64,
461+
},
462+
454463
/// Export database
455464
Export {
456465
/// Database ID

crates/redisctl/src/commands/enterprise/database.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ pub async fn handle_database_command(
7474
)
7575
.await
7676
}
77+
EnterpriseDatabaseCommands::Watch { id, poll_interval } => {
78+
database_impl::watch_database(conn_mgr, profile_name, *id, *poll_interval, query).await
79+
}
7780
EnterpriseDatabaseCommands::Export { id, data } => {
7881
database_impl::export_database(conn_mgr, profile_name, *id, data, output_format, query)
7982
.await

crates/redisctl/src/commands/enterprise/database_impl.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,108 @@ pub async fn delete_database(
208208
Ok(())
209209
}
210210

211+
/// Watch database status changes
212+
pub async fn watch_database(
213+
conn_mgr: &ConnectionManager,
214+
profile_name: Option<&str>,
215+
id: u32,
216+
poll_interval: u64,
217+
query: Option<&str>,
218+
) -> CliResult<()> {
219+
use futures::StreamExt;
220+
use tokio::signal;
221+
222+
let client = conn_mgr.create_enterprise_client(profile_name).await?;
223+
let handler = redis_enterprise::BdbHandler::new(client);
224+
let mut stream = handler.watch_database(id, std::time::Duration::from_secs(poll_interval));
225+
226+
println!("Watching database {} (Ctrl+C to stop)...\n", id);
227+
228+
loop {
229+
tokio::select! {
230+
_ = signal::ctrl_c() => {
231+
println!("\nStopping database watch...");
232+
break;
233+
}
234+
result = stream.next() => {
235+
match result {
236+
Some(Ok((db_info, prev_status))) => {
237+
let current_status = db_info.status.as_deref().unwrap_or("unknown");
238+
let timestamp = chrono::Utc::now().format("%H:%M:%S");
239+
240+
// Check if this is a status change
241+
if let Some(old_status) = prev_status {
242+
// Status transition detected
243+
println!(
244+
"[{}] Database {}: {} -> {} (TRANSITION)",
245+
timestamp, id, old_status, current_status
246+
);
247+
248+
// Show key metrics during transition
249+
if let Some(memory_used) = db_info.memory_used
250+
&& let Some(memory_size) = db_info.memory_size {
251+
let usage_pct = (memory_used as f64 / memory_size as f64) * 100.0;
252+
println!(" Memory: {} / {} ({:.1}%)",
253+
format_bytes(memory_used),
254+
format_bytes(memory_size),
255+
usage_pct
256+
);
257+
}
258+
259+
if let Some(shards) = db_info.shards_count {
260+
println!(" Shards: {}", shards);
261+
}
262+
} else {
263+
// Regular status update (no change)
264+
print!("[{}] Database {}: {}", timestamp, id, current_status);
265+
266+
// Apply JMESPath query if provided
267+
if query.is_some() {
268+
let db_json = serde_json::to_value(&db_info)
269+
.map_err(|e| RedisCtlError::from(anyhow::anyhow!("Serialization error: {}", e)))?;
270+
let filtered = handle_output(db_json, OutputFormat::Json, query)?;
271+
print!(" | {}", serde_json::to_string(&filtered)?);
272+
} else {
273+
// Show brief metrics
274+
if let Some(memory_used) = db_info.memory_used {
275+
print!(" | mem: {}", format_bytes(memory_used));
276+
}
277+
if let Some(shards) = db_info.shards_count {
278+
print!(" | shards: {}", shards);
279+
}
280+
}
281+
println!();
282+
}
283+
}
284+
Some(Err(e)) => {
285+
eprintln!("Error watching database: {}", e);
286+
break;
287+
}
288+
None => {
289+
break;
290+
}
291+
}
292+
}
293+
}
294+
}
295+
296+
Ok(())
297+
}
298+
299+
/// Format bytes into human-readable format
300+
fn format_bytes(bytes: u64) -> String {
301+
const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
302+
let mut size = bytes as f64;
303+
let mut unit_index = 0;
304+
305+
while size >= 1024.0 && unit_index < UNITS.len() - 1 {
306+
size /= 1024.0;
307+
unit_index += 1;
308+
}
309+
310+
format!("{:.2}{}", size, UNITS[unit_index])
311+
}
312+
211313
/// Export database
212314
pub async fn export_database(
213315
conn_mgr: &ConnectionManager,

0 commit comments

Comments
 (0)