@@ -7,22 +7,21 @@ pub struct StreamWriter {
77 pending : Arc < RwLock < Vec < u8 > > > ,
88 done : Arc < RwLock < bool > > ,
99 // A way for the write side to signal new data to the stream side
10- // ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY
11- // write_index: Arc<RwLock<i64>>,
12- // write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
13- // write_index_receiver: tokio::sync::watch::Receiver<i64>,
10+ write_index : Arc < RwLock < i64 > > ,
11+ write_index_sender : Arc < tokio:: sync:: watch:: Sender < i64 > > ,
12+ write_index_receiver : tokio:: sync:: watch:: Receiver < i64 > ,
1413}
1514
1615impl StreamWriter {
1716 pub fn new ( ) -> Self {
18- // let write_index = 0;
19- // let (tx, rx) = tokio::sync::watch::channel(write_index);
17+ let write_index = 0 ;
18+ let ( tx, rx) = tokio:: sync:: watch:: channel ( write_index) ;
2019 Self {
2120 pending : Arc :: new ( RwLock :: new ( vec ! [ ] ) ) ,
2221 done : Arc :: new ( RwLock :: new ( false ) ) ,
23- // write_index: Arc::new(RwLock::new(write_index)),
24- // write_index_sender: Arc::new(tx),
25- // write_index_receiver: rx,
22+ write_index : Arc :: new ( RwLock :: new ( write_index) ) ,
23+ write_index_sender : Arc :: new ( tx) ,
24+ write_index_receiver : rx,
2625 }
2726 }
2827
@@ -35,14 +34,12 @@ impl StreamWriter {
3534 Err ( e) =>
3635 Err ( anyhow:: anyhow!( "Internal error: StreamWriter::append can't take lock: {}" , e) )
3736 } ;
38- // This was meant to wake up listener threads when there was new data but it ended up
39- // just stalling until input was complete. TODO: investigate so we can get rid of the
40- // duration-based polling.
41- // {
42- // let mut write_index = self.write_index.write().unwrap();
43- // *write_index = *write_index + 1;
44- // self.write_index_sender.send(*write_index).unwrap();
45- // }
37+ {
38+ let mut write_index = self . write_index . write ( ) . unwrap ( ) ;
39+ * write_index = * write_index + 1 ;
40+ self . write_index_sender . send ( * write_index) . unwrap ( ) ;
41+ drop ( write_index) ;
42+ }
4643 result
4744 }
4845
@@ -89,28 +86,33 @@ impl StreamWriter {
8986 if self . is_done( ) {
9087 return ;
9188 } else {
92- // Not sure how to do this better. I tried using a signal that data
93- // had changed (via tokio::sync::watch::channel()), but that effectively
94- // blocked - we got the first chunk quickly but then it stalled waiting
95- // for the change notification. Polling is awful (and this interval is
96- // probably too aggressive) but I don't know how to get signalling
97- // to work!
98- tokio:: time:: sleep( tokio:: time:: Duration :: from_micros( 1 ) ) . await ;
99-
100- // For the record: this is what I tried:
101- // match self.write_index_receiver.changed().await {
102- // Ok(_) => continue,
103- // Err(e) => {
104- // // If this ever happens (which it, cough, shouldn't), it means all senders have
105- // // closed, which _should_ mean we are done. Log the error
106- // // but don't return it to the stream: the response as streamed so far
107- // // _should_ be okay!
108- // tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
109- // return;
110- // }
111- // }
89+ // This tiny wait seems to help the write-stream pipeline to flow more smmoothly.
90+ // If we go straight to the 'changed().await' then the pipeline seems to stall after
91+ // a few dozen writes, and everything else gets held up until the entire output
92+ // has been written. There may be better ways of doing this; I haven't found them
93+ // yet.
94+ //
95+ // (By the way, having the timer but not the change notification also worked. But if
96+ // writes came slowly, that would result in very aggressive polling. So hopefully this
97+ // gives us the best of both worlds.)
98+ tokio:: time:: sleep( tokio:: time:: Duration :: from_nanos( 10 ) ) . await ;
99+
100+ match self . write_index_receiver. changed( ) . await {
101+ Ok ( _) => continue ,
102+ Err ( e) => {
103+ // If this ever happens (which it, cough, shouldn't), it means all senders have
104+ // closed, which _should_ mean we are done. Log the error
105+ // but don't return it to the stream: the response as streamed so far
106+ // _should_ be okay!
107+ tracing:: error!( "StreamWriter::as_stream: error receiving write updates: {}" , e) ;
108+ return ;
109+ }
110+ }
112111 }
113112 } else {
113+ // This tiny wait seems to help the write-stream pipeline to flow more smmoothly.
114+ // See the comment on the 'empty buffer' case.
115+ tokio:: time:: sleep( tokio:: time:: Duration :: from_nanos( 10 ) ) . await ;
114116 yield Ok ( v) ;
115117 }
116118 } ,
0 commit comments