Skip to content

Commit 2a13296

Browse files
feat: Add streaming logs support with --follow flag (Issue #70) (#404)
* refactor: extract profile commands from main.rs to dedicated module - Create commands/profile.rs with all profile command handlers - Move 660 lines of profile logic out of main.rs - Reduce main.rs from 1,589 lines to 929 lines (41.5% reduction) - Split into focused handlers: list, path, show, set, remove, validate, default-enterprise, default-cloud - Update commands/mod.rs to include profile module - Remove unused Context import from main.rs - All 45 tests passing - No clippy warnings Main.rs now focuses on CLI parsing, initialization, and command routing. Closes #402 * feat: add streaming logs support with --follow flag Implements real-time log streaming for Redis Enterprise using polling-based approach. ## Changes ### redis-enterprise crate - Add tokio-stream, futures, async-stream dependencies - Implement stream_logs() method in LogsHandler - Uses polling with configurable interval to simulate streaming - Tracks last timestamp to avoid duplicate entries ### redisctl CLI - Add --follow/-f flag to 'logs list' command - Add --poll-interval flag (default: 2 seconds) - Implement handle_stream_logs for real-time output - Support Ctrl+C graceful shutdown - Format logs as they arrive in table format - Support JMESPath filtering on streamed logs ## Usage ```bash # Stream logs in real-time (like tail -f) redisctl enterprise logs list --follow # Custom poll interval redisctl enterprise logs list --follow --poll-interval 5 # With filtering redisctl enterprise logs list --follow -q 'type' ``` ## Implementation Details Since Redis Enterprise API doesn't natively support streaming, this: - Polls the /v1/logs endpoint at regular intervals - Tracks the last timestamp seen to fetch only new entries - Uses async-stream for ergonomic async generator syntax - Handles Ctrl+C for graceful shutdown ## Testing - All 67 tests passing (5 cloud, 17 enterprise, 45 redisctl) - No clippy warnings - Formatted with cargo fmt Addresses #70
1 parent f6f6ea9 commit 2a13296

File tree

7 files changed

+199
-0
lines changed

7 files changed

+199
-0
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ allow-dirty = ["ci"]
5252
anyhow = "1.0"
5353
thiserror = "2.0"
5454
tokio = { version = "1.40", features = ["full"] }
55+
tokio-stream = "0.1"
56+
futures = "0.3"
5557
serde = { version = "1.0", features = ["derive"] }
5658
tar = "0.4"
5759
flate2 = "1.0"

crates/redis-enterprise/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@ readme = "../../README.md"
1616

1717
[dependencies]
1818
async-trait = { workspace = true }
19+
async-stream = "0.3"
1920
reqwest = { workspace = true }
2021
serde = { workspace = true }
2122
serde_json = { workspace = true }
2223
serde_path_to_error = "0.1"
2324
serde_urlencoded = { workspace = true }
2425
thiserror = { workspace = true }
2526
tokio = { workspace = true }
27+
tokio-stream = { workspace = true }
28+
futures = { workspace = true }
2629
tracing = { workspace = true }
2730
anyhow = { workspace = true }
2831
base64 = { workspace = true }

crates/redis-enterprise/src/logs.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44
//! - Query cluster logs
55
//! - Configure log levels
66
//! - Export log data
7+
//! - Stream logs in real-time (via polling)
78
89
use crate::client::RestClient;
910
use crate::error::Result;
11+
use futures::stream::Stream;
1012
use serde::{Deserialize, Serialize};
1113
use serde_json::Value;
14+
use std::pin::Pin;
15+
use std::time::Duration;
16+
use tokio::time::sleep;
1217

1318
/// Log entry (cluster event)
1419
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -67,4 +72,60 @@ impl LogsHandler {
6772
self.client.get("/v1/logs").await
6873
}
6974
}
75+
76+
/// Stream logs in real-time by polling
77+
///
78+
/// Since Redis Enterprise API doesn't support native streaming, this polls
79+
/// the logs endpoint at regular intervals and yields new log entries.
80+
///
81+
/// # Arguments
82+
/// * `poll_interval` - Time to wait between polls (default: 2 seconds)
83+
/// * `limit` - Maximum number of logs to fetch per poll (default: 100)
84+
///
85+
/// # Returns
86+
/// A stream of log entries that can be consumed with `while let Some(entry) = stream.next().await`
87+
pub fn stream_logs(
88+
&self,
89+
poll_interval: Duration,
90+
limit: Option<u32>,
91+
) -> Pin<Box<dyn Stream<Item = Result<LogEntry>> + Send + '_>> {
92+
Box::pin(async_stream::stream! {
93+
let mut last_time: Option<String> = None;
94+
95+
loop {
96+
// Build query - get logs after the last timestamp we saw
97+
let query = LogsQuery {
98+
stime: last_time.clone(),
99+
etime: None,
100+
order: Some("asc".to_string()), // Ascending so we get chronological order
101+
limit,
102+
offset: None,
103+
};
104+
105+
// Fetch logs
106+
match self.list(Some(query)).await {
107+
Ok(entries) => {
108+
if !entries.is_empty() {
109+
// Update last_time to the timestamp of the last entry
110+
if let Some(last_entry) = entries.last() {
111+
last_time = Some(last_entry.time.clone());
112+
}
113+
114+
// Yield each entry
115+
for entry in entries {
116+
yield Ok(entry);
117+
}
118+
}
119+
}
120+
Err(e) => {
121+
yield Err(e);
122+
break;
123+
}
124+
}
125+
126+
// Wait before next poll
127+
sleep(poll_interval).await;
128+
}
129+
})
130+
}
70131
}

crates/redisctl/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ clap = { workspace = true }
2727
clap_complete = "4.5"
2828
anyhow = { workspace = true }
2929
tokio = { workspace = true }
30+
tokio-stream = { workspace = true }
31+
futures = { workspace = true }
3032
tracing = { workspace = true }
3133
tracing-subscriber = { workspace = true }
3234
serde = { workspace = true }

crates/redisctl/src/commands/enterprise/logs.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,22 @@ use clap::Subcommand;
44
pub enum LogsCommands {
55
/// List cluster event logs
66
#[command(visible_alias = "ls")]
7+
#[command(after_help = "EXAMPLES:
8+
# Get recent logs
9+
redisctl enterprise logs list
10+
11+
# Get logs since a specific time
12+
redisctl enterprise logs list --since 2024-01-01T00:00:00Z
13+
14+
# Stream logs in real-time (like tail -f)
15+
redisctl enterprise logs list --follow
16+
17+
# Stream logs with custom poll interval
18+
redisctl enterprise logs list --follow --poll-interval 5
19+
20+
# Limit number of logs per fetch
21+
redisctl enterprise logs list --limit 50
22+
")]
723
List {
824
/// Start time (ISO 8601 format)
925
#[arg(long)]
@@ -24,5 +40,13 @@ pub enum LogsCommands {
2440
/// Number of events to skip (for pagination)
2541
#[arg(long)]
2642
offset: Option<u32>,
43+
44+
/// Follow log output (stream in real-time like tail -f)
45+
#[arg(long, short = 'f')]
46+
follow: bool,
47+
48+
/// Poll interval in seconds when following logs
49+
#[arg(long, default_value = "2", requires = "follow")]
50+
poll_interval: u64,
2751
},
2852
}

crates/redisctl/src/commands/enterprise/logs_impl.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use crate::cli::OutputFormat;
77
use crate::commands::enterprise::logs::LogsCommands;
88
use crate::connection::ConnectionManager;
99
use crate::error::Result as CliResult;
10+
use futures::StreamExt;
1011
use redis_enterprise::logs::LogsQuery;
12+
use std::time::Duration;
13+
use tokio::signal;
1114

1215
/// Parameters for log list operation
1316
struct LogListParams {
@@ -16,6 +19,8 @@ struct LogListParams {
1619
order: Option<String>,
1720
limit: Option<u32>,
1821
offset: Option<u32>,
22+
follow: bool,
23+
poll_interval: u64,
1924
}
2025

2126
pub async fn handle_logs_commands(
@@ -32,13 +37,17 @@ pub async fn handle_logs_commands(
3237
order,
3338
limit,
3439
offset,
40+
follow,
41+
poll_interval,
3542
} => {
3643
let params = LogListParams {
3744
since: since.clone(),
3845
until: until.clone(),
3946
order: order.clone(),
4047
limit: *limit,
4148
offset: *offset,
49+
follow: *follow,
50+
poll_interval: *poll_interval,
4251
};
4352
handle_list_logs(conn_mgr, profile_name, params, output_format, query).await
4453
}
@@ -55,6 +64,12 @@ async fn handle_list_logs(
5564
let client = conn_mgr.create_enterprise_client(profile_name).await?;
5665
let handler = redis_enterprise::LogsHandler::new(client);
5766

67+
// Handle streaming mode
68+
if params.follow {
69+
return handle_stream_logs(handler, params, output_format, query).await;
70+
}
71+
72+
// Normal (non-streaming) mode
5873
let logs_query = if params.since.is_some()
5974
|| params.until.is_some()
6075
|| params.order.is_some()
@@ -92,3 +107,79 @@ async fn handle_list_logs(
92107

93108
Ok(())
94109
}
110+
111+
async fn handle_stream_logs(
112+
handler: redis_enterprise::LogsHandler,
113+
params: LogListParams,
114+
output_format: OutputFormat,
115+
query: Option<&str>,
116+
) -> CliResult<()> {
117+
// Only support table/auto format for streaming (JSON/YAML don't make sense for streams)
118+
if !matches!(output_format, OutputFormat::Auto | OutputFormat::Table) {
119+
return Err(RedisCtlError::InvalidInput {
120+
message: "Streaming logs (--follow) only supports table output format".to_string(),
121+
});
122+
}
123+
124+
let poll_interval = Duration::from_secs(params.poll_interval);
125+
let mut stream = handler.stream_logs(poll_interval, params.limit);
126+
127+
println!("Streaming logs (Ctrl+C to stop)...\n");
128+
129+
loop {
130+
tokio::select! {
131+
// Handle Ctrl+C
132+
_ = signal::ctrl_c() => {
133+
println!("\nStopping log stream...");
134+
break;
135+
}
136+
// Handle next log entry
137+
entry_result = stream.next() => {
138+
match entry_result {
139+
Some(Ok(entry)) => {
140+
// Format and print the log entry
141+
let entry_json = serde_json::to_value(&entry)?;
142+
143+
// Apply JMESPath query if provided
144+
let output_data = if let Some(q) = query {
145+
crate::commands::enterprise::utils::apply_jmespath(&entry_json, q)?
146+
} else {
147+
entry_json
148+
};
149+
150+
// Print each entry as it arrives
151+
// For table format, print a simple formatted line
152+
if let Some(time) = output_data.get("time").and_then(|t| t.as_str()) {
153+
let event_type = output_data.get("type")
154+
.and_then(|t| t.as_str())
155+
.unwrap_or("unknown");
156+
157+
// Print timestamp and event type
158+
print!("[{}] {}", time, event_type);
159+
160+
// Print any extra fields
161+
if let Some(obj) = output_data.as_object() {
162+
for (key, value) in obj {
163+
if key != "time" && key != "type" {
164+
print!(" {}={}", key, value);
165+
}
166+
}
167+
}
168+
println!();
169+
}
170+
}
171+
Some(Err(e)) => {
172+
eprintln!("Error fetching logs: {}", e);
173+
break;
174+
}
175+
None => {
176+
// Stream ended
177+
break;
178+
}
179+
}
180+
}
181+
}
182+
}
183+
184+
Ok(())
185+
}

0 commit comments

Comments
 (0)