@@ -7,8 +7,7 @@ use owo_colors::OwoColorize;
77use parsers:: firehose_parser_ack_nak;
88use serial:: setup_serial_device;
99use std:: cmp:: min;
10- use std:: io:: Read ;
11- use std:: io:: Write ;
10+ use std:: io:: { Read , Write } ;
1211use std:: str:: { self , FromStr } ;
1312use types:: FirehoseResetMode ;
1413use types:: FirehoseStatus ;
@@ -73,13 +72,13 @@ pub fn firehose_read<T: QdlChan>(
7372 channel : & mut T ,
7473 response_parser : fn ( & mut T , & IndexMap < String , String > ) -> Result < FirehoseStatus , anyhow:: Error > ,
7574) -> Result < FirehoseStatus , anyhow:: Error > {
76- // xml_buffer_size comes from the device, so the XML should always fit
77- let mut buf = vec ! [ 0u8 ; channel. fh_config( ) . xml_buf_size] ;
7875 let mut got_any_data = false ;
76+ let mut pending: Vec < u8 > = Vec :: new ( ) ;
7977
8078 loop {
81- let bytes_read = match channel. read ( & mut buf) {
82- Ok ( n) => n,
79+ // Use BufRead to peek at available data
80+ let available = match channel. fill_buf ( ) {
81+ Ok ( buf) => buf,
8382 Err ( e) => match e. kind ( ) {
8483 // In some cases (like with welcome messages), there's no acking
8584 // and a timeout is the "end of data" marker instead..
@@ -96,15 +95,40 @@ pub fn firehose_read<T: QdlChan>(
9695
9796 got_any_data = true ;
9897
99- let xml_fragments_indices: Vec < _ > = str:: from_utf8 ( & buf[ ..bytes_read] ) ?
100- . match_indices ( "<?xml" )
101- . map ( |s| s. 0 )
102- . collect ( ) ;
98+ // When channel is a non-packetized BufRead (e.g. serial) XML documents
99+ // are not separated from each other, or from rawmode data. Search for
100+ // </data> in the BufRead stream to find the end of the current
101+ // message.
102+ let data_end_marker = b"</data>" ;
103+
104+ let pending_length = pending. len ( ) ;
105+ pending. extend_from_slice ( available) ;
106+
107+ // Search for the end marker in the pending data
108+ let end_pos = pending
109+ . windows ( data_end_marker. len ( ) )
110+ . position ( |window| window == data_end_marker) ;
111+
112+ if let Some ( pos) = end_pos {
113+ let xml_end = pos + data_end_marker. len ( ) ;
114+
115+ // xml_end is relative "pending", we need to consume only new the tail
116+ channel. consume ( xml_end - pending_length) ;
117+
118+ // Only parse the XML portion
119+ let xml_chunk = & pending[ ..xml_end] ;
120+ let xml = match xmltree:: Element :: parse ( xml_chunk) {
121+ Ok ( x) => x,
122+ Err ( e) => {
123+ // Consume the bad data and continue
124+ bail ! ( "Failed to parse XML: {}" , e) ;
125+ }
126+ } ;
103127
104- for chunk in xml_fragments_indices . chunks ( 2 ) {
105- let start = chunk [ 0 ] ;
106- let end = * chunk . get ( 1 ) . unwrap_or ( & bytes_read ) ;
107- let xml = xmltree :: Element :: parse ( & buf [ start..end ] ) ? ;
128+ // The current message might have started in "pending", so clear it
129+ // now. No need to do this if we're bailing above, as it's a local
130+ // resource.
131+ pending . clear ( ) ;
108132
109133 if xml. name != "data" {
110134 // TODO: define a more verbose level
@@ -152,6 +176,12 @@ pub fn firehose_read<T: QdlChan>(
152176 // Pass other nodes to specialized parsers
153177 return response_parser ( channel, & e. attributes ) ;
154178 }
179+ } else {
180+ // Didn't find the tail of the XML document in "pending" +
181+ // "available", consume the data into "pending" to let fill_buf()
182+ // read more data from the underlying Read.
183+ let available_len = available. len ( ) ;
184+ channel. consume ( available_len) ;
155185 }
156186 }
157187}
@@ -482,7 +512,7 @@ pub fn firehose_read_storage(
482512 ) ?;
483513
484514 firehose_write ( channel, & mut xml) ?;
485- if firehose_read ( channel, firehose_parser_ack_nak) ? != FirehoseStatus :: Ack {
515+ if firehose_read ( channel, firehose_parser_ack_nak) . unwrap ( ) != FirehoseStatus :: Ack {
486516 bail ! ( "Read request was NAKed" ) ;
487517 }
488518
@@ -511,7 +541,7 @@ pub fn firehose_read_storage(
511541
512542 if !last_read_was_zero_len && channel. fh_config ( ) . backend == QdlBackend :: Usb {
513543 // Issue a dummy read to drain the queue
514- let _ = channel. read ( & mut [ ] ) ? ;
544+ let _ = channel. read ( & mut [ ] ) . unwrap ( ) ;
515545 }
516546
517547 if firehose_read ( channel, firehose_parser_ack_nak) ? != FirehoseStatus :: Ack {
0 commit comments