@@ -8,6 +8,7 @@ use anyhow::Context;
88use futures:: StreamExt ;
99use redis_enterprise:: stats:: StatsHandler ;
1010use std:: time:: Duration ;
11+ use tokio:: signal;
1112
1213use super :: utils:: * ;
1314
@@ -135,19 +136,38 @@ async fn handle_database_stats_stream(
135136 let stats_handler = StatsHandler :: new ( client) ;
136137 let mut stream = stats_handler. stream_database ( database_id, Duration :: from_secs ( poll_interval) ) ;
137138
138- while let Some ( result) = stream. next ( ) . await {
139- match result {
140- Ok ( stats) => {
141- let stats_json =
142- serde_json:: to_value ( stats) . context ( "Failed to serialize stats" ) ?;
143- let data = handle_output ( stats_json, output_format, query) ?;
144- print_formatted_output ( data, output_format) ?;
145- println ! ( ) ; // Separator between polls
146- }
147- Err ( e) => {
148- eprintln ! ( "Error fetching stats: {}" , e) ;
139+ println ! (
140+ "Streaming database {} stats (Ctrl+C to stop)...\n " ,
141+ database_id
142+ ) ;
143+
144+ loop {
145+ tokio:: select! {
146+ // Handle Ctrl+C
147+ _ = signal:: ctrl_c( ) => {
148+ println!( "\n Stopping stats stream..." ) ;
149149 break ;
150150 }
151+ // Handle next stats entry
152+ result = stream. next( ) => {
153+ match result {
154+ Some ( Ok ( stats) ) => {
155+ let stats_json =
156+ serde_json:: to_value( stats) . context( "Failed to serialize stats" ) ?;
157+ let data = handle_output( stats_json, output_format, query) ?;
158+ print_formatted_output( data, output_format) ?;
159+ println!( ) ; // Separator between polls
160+ }
161+ Some ( Err ( e) ) => {
162+ eprintln!( "Error fetching stats: {}" , e) ;
163+ break ;
164+ }
165+ None => {
166+ // Stream ended
167+ break ;
168+ }
169+ }
170+ }
151171 }
152172 }
153173 Ok ( ( ) )
@@ -226,19 +246,32 @@ async fn handle_node_stats_stream(
226246 let stats_handler = StatsHandler :: new ( client) ;
227247 let mut stream = stats_handler. stream_node ( node_id, Duration :: from_secs ( poll_interval) ) ;
228248
229- while let Some ( result) = stream. next ( ) . await {
230- match result {
231- Ok ( stats) => {
232- let stats_json =
233- serde_json:: to_value ( stats) . context ( "Failed to serialize stats" ) ?;
234- let data = handle_output ( stats_json, output_format, query) ?;
235- print_formatted_output ( data, output_format) ?;
236- println ! ( ) ;
237- }
238- Err ( e) => {
239- eprintln ! ( "Error fetching stats: {}" , e) ;
249+ println ! ( "Streaming node {} stats (Ctrl+C to stop)...\n " , node_id) ;
250+
251+ loop {
252+ tokio:: select! {
253+ _ = signal:: ctrl_c( ) => {
254+ println!( "\n Stopping stats stream..." ) ;
240255 break ;
241256 }
257+ result = stream. next( ) => {
258+ match result {
259+ Some ( Ok ( stats) ) => {
260+ let stats_json =
261+ serde_json:: to_value( stats) . context( "Failed to serialize stats" ) ?;
262+ let data = handle_output( stats_json, output_format, query) ?;
263+ print_formatted_output( data, output_format) ?;
264+ println!( ) ;
265+ }
266+ Some ( Err ( e) ) => {
267+ eprintln!( "Error fetching stats: {}" , e) ;
268+ break ;
269+ }
270+ None => {
271+ break ;
272+ }
273+ }
274+ }
242275 }
243276 }
244277 Ok ( ( ) )
@@ -296,19 +329,32 @@ async fn handle_cluster_stats_stream(
296329 let stats_handler = StatsHandler :: new ( client) ;
297330 let mut stream = stats_handler. stream_cluster ( Duration :: from_secs ( poll_interval) ) ;
298331
299- while let Some ( result) = stream. next ( ) . await {
300- match result {
301- Ok ( stats) => {
302- let stats_json =
303- serde_json:: to_value ( stats) . context ( "Failed to serialize stats" ) ?;
304- let data = handle_output ( stats_json, output_format, query) ?;
305- print_formatted_output ( data, output_format) ?;
306- println ! ( ) ;
307- }
308- Err ( e) => {
309- eprintln ! ( "Error fetching stats: {}" , e) ;
332+ println ! ( "Streaming cluster stats (Ctrl+C to stop)...\n " ) ;
333+
334+ loop {
335+ tokio:: select! {
336+ _ = signal:: ctrl_c( ) => {
337+ println!( "\n Stopping stats stream..." ) ;
310338 break ;
311339 }
340+ result = stream. next( ) => {
341+ match result {
342+ Some ( Ok ( stats) ) => {
343+ let stats_json =
344+ serde_json:: to_value( stats) . context( "Failed to serialize stats" ) ?;
345+ let data = handle_output( stats_json, output_format, query) ?;
346+ print_formatted_output( data, output_format) ?;
347+ println!( ) ;
348+ }
349+ Some ( Err ( e) ) => {
350+ eprintln!( "Error fetching stats: {}" , e) ;
351+ break ;
352+ }
353+ None => {
354+ break ;
355+ }
356+ }
357+ }
312358 }
313359 }
314360 Ok ( ( ) )
0 commit comments