@@ -7,21 +7,22 @@ pub struct StreamWriter {
77 pending : Arc < RwLock < Vec < u8 > > > ,
88 done : Arc < RwLock < bool > > ,
99 // A way for the write side to signal new data to the stream side
10- write_index : Arc < RwLock < i64 > > ,
11- write_index_sender : Arc < tokio:: sync:: watch:: Sender < i64 > > ,
12- write_index_receiver : tokio:: sync:: watch:: Receiver < i64 > ,
10+ // ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY
11+ // write_index: Arc<RwLock<i64>>,
12+ // write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
13+ // write_index_receiver: tokio::sync::watch::Receiver<i64>,
1314}
1415
1516impl StreamWriter {
1617 pub fn new ( ) -> Self {
17- let write_index = 0 ;
18- let ( tx, rx) = tokio:: sync:: watch:: channel ( write_index) ;
18+ // let write_index = 0;
19+ // let (tx, rx) = tokio::sync::watch::channel(write_index);
1920 Self {
2021 pending : Arc :: new ( RwLock :: new ( vec ! [ ] ) ) ,
2122 done : Arc :: new ( RwLock :: new ( false ) ) ,
22- write_index : Arc :: new ( RwLock :: new ( write_index) ) ,
23- write_index_sender : Arc :: new ( tx) ,
24- write_index_receiver : rx,
23+ // write_index: Arc::new(RwLock::new(write_index)),
24+ // write_index_sender: Arc::new(tx),
25+ // write_index_receiver: rx,
2526 }
2627 }
2728
@@ -34,11 +35,14 @@ impl StreamWriter {
3435 Err ( e) =>
3536 Err ( anyhow:: anyhow!( "Internal error: StreamWriter::append can't take lock: {}" , e) )
3637 } ;
37- {
38- let mut write_index = self . write_index . write ( ) . unwrap ( ) ;
39- * write_index = * write_index + 1 ;
40- self . write_index_sender . send ( * write_index) . unwrap ( ) ;
41- }
38+ // This was meant to wake up listener threads when there was new data but it ended up
39+ // just stalling until input was complete. TODO: investigate so we can get rid of the
40+ // duration-based polling.
41+ // {
42+ // let mut write_index = self.write_index.write().unwrap();
43+ // *write_index = *write_index + 1;
44+ // self.write_index_sender.send(*write_index).unwrap();
45+ // }
4246 result
4347 }
4448
@@ -68,6 +72,9 @@ impl StreamWriter {
6872 return Err ( anyhow:: anyhow!( "Internal error: StreamWriter::header_block can't take lock: {}" , e) ) ;
6973 } ,
7074 }
75+ // See comments on the as_stream loop, though using the change signal
76+ // blocked this *completely* until end of writing! (And everything else
77+ // waits on this.)
7178 tokio:: time:: sleep ( tokio:: time:: Duration :: from_micros ( 1 ) ) . await ;
7279 }
7380 }
@@ -82,21 +89,26 @@ impl StreamWriter {
8289 if self . is_done( ) {
8390 return ;
8491 } else {
85- // Not sure this is the smoothest way to do it. The oldest way was:
86- // tokio::time::sleep(tokio::time::Duration::from_micros(20)).await;
87- // which is a hideous kludge but subjectively felt quicker (but the
88- // number say not, so what is truth anyway)
89- match self . write_index_receiver. changed( ) . await {
90- Ok ( _) => continue ,
91- Err ( e) => {
92- // If this ever happens (which it, cough, shouldn't), it means all senders have
93- // closed, which _should_ mean we are done. Log the error
94- // but don't return it to the stream: the response as streamed so far
95- // _should_ be okay!
96- tracing:: error!( "StreamWriter::as_stream: error receiving write updates: {}" , e) ;
97- return ;
98- }
99- }
92+ // Not sure how to do this better. I tried using a signal that data
93+ // had changed (via tokio::sync::watch::channel()), but that effectively
94+ // blocked - we got the first chunk quickly but then it stalled waiting
95+ // for the change notification. Polling is awful (and this interval is
96+ // probably too aggressive) but I don't know how to get signalling
97+ // to work!
98+ tokio:: time:: sleep( tokio:: time:: Duration :: from_micros( 1 ) ) . await ;
99+
100+ // For the record: this is what I tried:
101+ // match self.write_index_receiver.changed().await {
102+ // Ok(_) => continue,
103+ // Err(e) => {
104+ // // If this ever happens (which it, cough, shouldn't), it means all senders have
105+ // // closed, which _should_ mean we are done. Log the error
106+ // // but don't return it to the stream: the response as streamed so far
107+ // // _should_ be okay!
108+ // tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
109+ // return;
110+ // }
111+ // }
100112 }
101113 } else {
102114 yield Ok ( v) ;
0 commit comments