Skip to content

Commit 487124c

Browse files
committed
DataModel: Capture pending request for chunked read messages
1 parent 79d6169 commit 487124c

File tree

2 files changed

+33
-17
lines changed

2 files changed

+33
-17
lines changed

matter/src/data_model/core/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ pub mod subscribe;
228228
/// Type of Resume Request
229229
enum ResumeReq {
230230
Subscribe(subscribe::SubsCtx),
231-
Read,
231+
Read(read::ResumeReadReq),
232232
}
233233

234234
impl objects::ChangeConsumer for DataModel {
@@ -263,8 +263,15 @@ impl InteractionConsumer for DataModel {
263263
trans: &mut Transaction,
264264
tw: &mut TLVWriter,
265265
) -> Result<(), Error> {
266-
let is_chunked = self.handle_read_attr_array(req, trans, tw)?;
267-
if !is_chunked {
266+
let resume = self.handle_read_attr_array(req, trans, tw)?;
267+
if let Some(resume) = resume {
268+
// This is a multi-hop read transaction, remember this read request
269+
if !trans.exch.is_data_none() {
270+
error!("Exchange data already set, and multi-hop read");
271+
return Err(Error::InvalidState);
272+
}
273+
trans.exch.set_data_boxed(Box::new(ResumeReq::Read(resume)));
274+
} else {
268275
tw.bool(TagType::Context(SupressResponse as u8), true)?;
269276
// Mark transaction complete, if not chunked
270277
trans.complete();
@@ -311,7 +318,7 @@ impl InteractionConsumer for DataModel {
311318
) -> Result<(OpCode, ResponseRequired), Error> {
312319
if let Some(resume) = trans.exch.take_data_boxed::<ResumeReq>() {
313320
match *resume {
314-
ResumeReq::Read => Ok((OpCode::Reserved, ResponseRequired::No)),
321+
ResumeReq::Read(_) => Ok((OpCode::Reserved, ResponseRequired::No)),
315322
ResumeReq::Subscribe(mut ctx) => {
316323
let result = self.handle_subscription_confirm(trans, tw, &mut ctx)?;
317324
trans.exch.set_data_boxed(resume);

matter/src/data_model/core/read.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717

1818
use crate::data_model::{core::DataModel, objects::*};
19+
use crate::transport::packet::Packet;
20+
use crate::utils::writebuf::WriteBuf;
1921
use crate::{
2022
acl::{AccessReq, Accessor},
2123
error::*,
@@ -104,12 +106,7 @@ impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> {
104106
pub struct ResumeReadReq {
105107
/// The Read Request Attribute Path that caused chunking, and this is the path
106108
/// that needs to be resumed.
107-
///
108-
/// TODO: Ideally, the entire ReadRequest (with any subsequent AttrPaths) should also
109-
/// be maintained. But for now, we just store the AttrPath that caused the overflow
110-
/// and chunking. Hopefully, the other end requests any pending paths when it sees no
111-
/// more chunks.
112-
pending_path: GenericPath,
109+
pending_req: Option<Packet<'static>>,
113110

114111
/// The Attribute that couldn't be encoded because our buffer got full. The next chunk
115112
/// will start encoding from this attribute onwards.
@@ -198,7 +195,7 @@ impl DataModel {
198195
read_req: &ReadReq,
199196
trans: &mut Transaction,
200197
tw: &mut TLVWriter,
201-
) -> Result<bool, Error> {
198+
) -> Result<Option<ResumeReadReq>, Error> {
202199
let mut resume_read_req: ResumeReadReq = Default::default();
203200

204201
let mut attr_encoder = AttrReadEncoder::new(tw);
@@ -226,19 +223,31 @@ impl DataModel {
226223
&mut attr_details,
227224
&mut resume_read_req.resume_encode,
228225
);
229-
if result.is_err() {
230-
resume_read_req.pending_path = attr_path.to_gp();
231-
break;
232-
}
233226
}
234227
tw.end_container()?;
235228
if result.is_err() {
236229
// If there was an error, indicate chunking. The resume_read_req would have been
237230
// already populated from in the loop above.
238231
tw.bool(TagType::Context(MoreChunkedMsgs as u8), true)?;
239-
return Ok(true);
232+
// Retain the entire request, because we need the data-filters, and subsequent attr-reads, if any
233+
// when we resume this read in the next hop
234+
resume_read_req.pending_req = Some(copy_read_req_to_packet(read_req)?);
235+
return Ok(Some(resume_read_req));
240236
}
241237
}
242-
Ok(false)
238+
Ok(None)
243239
}
244240
}
241+
242+
fn copy_read_req_to_packet(read_req: &ReadReq) -> Result<Packet<'static>, Error> {
243+
let mut packet = Packet::new_rx()?;
244+
let backup = packet.as_borrow_slice();
245+
let backup_len = backup.len();
246+
let mut wb = WriteBuf::new(backup, backup_len);
247+
let mut tw = TLVWriter::new(&mut wb);
248+
// TODO: This is unnecessarily wasteful, could directly copy &[u8] if accessible
249+
read_req.to_tlv(&mut tw, TagType::Anonymous)?;
250+
let data_len = wb.as_borrow_slice().len();
251+
packet.get_parsebuf()?.set_len(data_len);
252+
Ok(packet)
253+
}

0 commit comments

Comments
 (0)