@@ -11,17 +11,23 @@ mod cmd {
1111 #[ cfg( feature = "producer-file-io" ) ]
1212 use std:: path:: PathBuf ;
1313
14+ use async_std:: io:: stdin;
1415 use async_trait:: async_trait;
16+ use fluvio_future:: io:: StreamExt ;
17+ use fluvio_sc_schema:: message:: MsgType ;
18+ use fluvio_sc_schema:: topic:: TopicSpec ;
1519 #[ cfg( feature = "producer-file-io" ) ]
1620 use futures:: future:: join_all;
1721 use clap:: Parser ;
22+ use tokio:: select;
1823 use tracing:: { error, warn} ;
1924 use humantime:: parse_duration;
2025 use anyhow:: Result ;
2126
2227 use fluvio:: {
23- Compression , Fluvio , FluvioError , TopicProducerPool , TopicProducerConfigBuilder , RecordKey ,
24- ProduceOutput , DeliverySemantic , SmartModuleContextData , Isolation , SmartModuleInvocation ,
28+ Compression , DeliverySemantic , Fluvio , FluvioAdmin , FluvioError , Isolation , ProduceOutput ,
29+ RecordKey , SmartModuleContextData , SmartModuleInvocation , TopicProducerConfigBuilder ,
30+ TopicProducerPool ,
2531 } ;
2632 use fluvio_extension_common:: Terminal ;
2733 use fluvio_types:: print_cli_ok;
@@ -243,16 +249,18 @@ mod cmd {
243249 . await ?,
244250 ) ;
245251
252+ let admin = fluvio. admin ( ) . await ;
253+
246254 #[ cfg( feature = "producer-file-io" ) ]
247255 if self . raw {
248256 self . process_raw_file ( & producer) . await ?;
249257 } else {
250- self . produce_lines ( producer. clone ( ) ) . await ?;
258+ self . produce_lines ( producer. clone ( ) , & admin ) . await ?;
251259 } ;
252260
253261 #[ cfg( not( feature = "producer-file-io" ) ) ]
254262 {
255- self . produce_lines ( producer. clone ( ) ) . await ?;
263+ self . produce_lines ( producer. clone ( ) , & admin ) . await ?;
256264 }
257265
258266 producer. flush ( ) . await ?;
@@ -315,7 +323,11 @@ mod cmd {
315323 }
316324 }
317325
318- async fn produce_lines ( & self , producer : Arc < TopicProducerPool > ) -> Result < ( ) > {
326+ async fn produce_lines (
327+ & self ,
328+ producer : Arc < TopicProducerPool > ,
329+ admin : & FluvioAdmin ,
330+ ) -> Result < ( ) > {
319331 #[ cfg( feature = "producer-file-io" ) ]
320332 if let Some ( path) = & self . file {
321333 let reader = BufReader :: new ( File :: open ( path) ?) ;
@@ -340,7 +352,7 @@ mod cmd {
340352 . collect :: < Result < Vec < _ > , _ > > ( ) ?;
341353 }
342354 } else {
343- self . producer_stdin ( & producer) . await ?
355+ self . producer_stdin ( & producer, admin ) . await ?
344356 }
345357
346358 #[ cfg( not( feature = "producer-file-io" ) ) ]
@@ -349,27 +361,55 @@ mod cmd {
349361 Ok ( ( ) )
350362 }
351363
352- async fn producer_stdin ( & self , producer : & Arc < TopicProducerPool > ) -> Result < ( ) > {
353- let mut lines = BufReader :: new ( std:: io:: stdin ( ) ) . lines ( ) ;
364+ async fn producer_stdin (
365+ & self ,
366+ producer : & Arc < TopicProducerPool > ,
367+ admin : & FluvioAdmin ,
368+ ) -> Result < ( ) > {
369+ use async_std:: io:: prelude:: * ;
370+ use async_std:: io:: BufReader ;
371+ let mut lines = BufReader :: new ( stdin ( ) ) . lines ( ) ;
372+ let mut partition_stream = admin. watch :: < TopicSpec > ( ) . await ?;
373+
354374 if self . interactive_mode ( ) {
355375 eprint ! ( "> " ) ;
356376 }
357377
358- while let Some ( Ok ( line) ) = lines. next ( ) {
359- let produce_output = self . produce_line ( producer, & line) . await ?;
360-
361- if let Some ( produce_output) = produce_output {
362- if self . delivery_semantic != DeliverySemantic :: AtMostOnce {
363- // ensure it was properly sent
364- produce_output. wait ( ) . await ?;
378+ loop {
379+ select ! {
380+ line = lines. next( ) => {
381+ if let Some ( Ok ( line) ) = line {
382+ let produce_output = self . produce_line( producer, & line) . await ?;
383+
384+ if let Some ( produce_output) = produce_output {
385+ if self . delivery_semantic != DeliverySemantic :: AtMostOnce {
386+ // ensure it was properly sent
387+ produce_output. wait( ) . await ?;
388+ }
389+ }
390+
391+ if self . interactive_mode( ) {
392+ print_cli_ok!( ) ;
393+ eprint!( "> " ) ;
394+ }
395+ } else {
396+ // When stdin is closed, we break the loop
397+ break ;
398+ }
399+ }
400+ stream = partition_stream. next( ) => {
401+ if let Some ( stream) = stream {
402+ let stream = stream?;
403+ for change in stream. inner( ) . changes {
404+ if change. header == MsgType :: DELETE && change. content. name == self . topic {
405+ return Err ( CliError :: TopicDeleted ( self . topic. clone( ) ) . into( ) ) ;
406+ }
407+ }
408+ }
365409 }
366- }
367-
368- if self . interactive_mode ( ) {
369- print_cli_ok ! ( ) ;
370- eprint ! ( "> " ) ;
371410 }
372411 }
412+
373413 Ok ( ( ) )
374414 }
375415
0 commit comments