Skip to content

Commit 9b5a253

Browse files
authored
Merge pull request #28 from kedars/feature/large_reads
Support multi-leg reads
2 parents 7357d6d + 9aeb391 commit 9b5a253

File tree

16 files changed

+671
-290
lines changed

16 files changed

+671
-290
lines changed

matter/src/data_model/core.rs renamed to matter/src/data_model/core/mod.rs

Lines changed: 61 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18+
use self::subscribe::SubsCtx;
19+
1820
use super::{
1921
cluster_basic_information::BasicInfoConfig,
2022
device_types::device_type_add_root_node,
@@ -28,7 +30,7 @@ use crate::{
2830
fabric::FabricMgr,
2931
interaction_model::{
3032
command::CommandReq,
31-
core::IMStatusCode,
33+
core::{IMStatusCode, OpCode},
3234
messages::{
3335
ib::{self, AttrData, DataVersionFilter},
3436
msg::{self, InvReq, ReadReq, WriteReq},
@@ -37,8 +39,11 @@ use crate::{
3739
InteractionConsumer, Transaction,
3840
},
3941
secure_channel::pake::PaseMgr,
40-
tlv::{TLVArray, TLVWriter, TagType, ToTLV},
41-
transport::session::{Session, SessionMode},
42+
tlv::{self, FromTLV, TLVArray, TLVWriter, TagType, ToTLV},
43+
transport::{
44+
proto_demux::ResponseRequired,
45+
session::{Session, SessionMode},
46+
},
4247
};
4348
use log::{error, info};
4449
use std::sync::{Arc, RwLock};
@@ -76,17 +81,6 @@ impl DataModel {
7681
Ok(dm)
7782
}
7883

79-
pub fn read_attribute_raw(
80-
&self,
81-
endpoint: u16,
82-
cluster: u32,
83-
attr: u16,
84-
) -> Result<AttrValue, IMStatusCode> {
85-
let node = self.node.read().unwrap();
86-
let cluster = node.get_cluster(endpoint, cluster)?;
87-
cluster.base().read_attribute_raw(attr).map(|a| a.clone())
88-
}
89-
9084
// Encode a write attribute from a path that may or may not be wildcard
9185
fn handle_write_attr_path(
9286
node: &mut Node,
@@ -153,42 +147,6 @@ impl DataModel {
153147
}
154148
}
155149

156-
// Encode a read attribute from a path that may or may not be wildcard
157-
fn handle_read_attr_path(
158-
node: &Node,
159-
accessor: &Accessor,
160-
attr_encoder: &mut AttrReadEncoder,
161-
attr_details: &mut AttrDetails,
162-
) {
163-
let path = attr_encoder.path;
164-
// Skip error reporting for wildcard paths, don't for concrete paths
165-
attr_encoder.skip_error(path.is_wildcard());
166-
167-
let result = node.for_each_attribute(&path, |path, c| {
168-
// Ignore processing if data filter matches.
169-
// For a wildcard attribute, this may end happening unnecessarily for all attributes, although
170-
// a single skip for the cluster is sufficient. That requires us to replace this for_each with a
171-
// for_each_cluster
172-
let cluster_data_ver = c.base().get_dataver();
173-
if Self::data_filter_matches(&attr_encoder.data_ver_filters, path, cluster_data_ver) {
174-
return Ok(());
175-
}
176-
177-
attr_details.attr_id = path.leaf.unwrap_or_default() as u16;
178-
// Overwrite the previous path with the concrete path
179-
attr_encoder.set_path(*path);
180-
// Set the cluster's data version
181-
attr_encoder.set_data_ver(cluster_data_ver);
182-
let mut access_req = AccessReq::new(accessor, path, Access::READ);
183-
Cluster::read_attribute(c, &mut access_req, attr_encoder, attr_details);
184-
Ok(())
185-
});
186-
if let Err(e) = result {
187-
// We hit this only if this is a non-wildcard path
188-
attr_encoder.encode_status(e, 0);
189-
}
190-
}
191-
192150
// Handle command from a path that may or may not be wildcard
193151
fn handle_command_path(node: &mut Node, cmd_req: &mut CommandReq) {
194152
let wildcard = cmd_req.cmd.path.is_wildcard();
@@ -266,6 +224,15 @@ impl DataModel {
266224
}
267225
}
268226

227+
pub mod read;
228+
pub mod subscribe;
229+
230+
/// Type of Resume Request
231+
enum ResumeReq {
232+
Subscribe(subscribe::SubsCtx),
233+
Read(read::ResumeReadReq),
234+
}
235+
269236
impl objects::ChangeConsumer for DataModel {
270237
fn endpoint_added(&self, id: u16, endpoint: &mut Endpoint) -> Result<(), Error> {
271238
endpoint.add_cluster(DescriptorCluster::new(id, self.clone())?)?;
@@ -294,45 +261,22 @@ impl InteractionConsumer for DataModel {
294261

295262
fn consume_read_attr(
296263
&self,
297-
read_req: &ReadReq,
264+
rx_buf: &[u8],
298265
trans: &mut Transaction,
299266
tw: &mut TLVWriter,
300267
) -> Result<(), Error> {
301-
let mut attr_encoder = AttrReadEncoder::new(tw);
302-
if let Some(filters) = &read_req.dataver_filters {
303-
attr_encoder.set_data_ver_filters(filters);
304-
}
305-
306-
let mut attr_details = AttrDetails {
307-
// This will be updated internally
308-
attr_id: 0,
309-
// This will be updated internally
310-
list_index: None,
311-
// This will be updated internally
312-
fab_idx: 0,
313-
fab_filter: read_req.fabric_filtered,
314-
};
315-
316-
if let Some(attr_requests) = &read_req.attr_requests {
317-
let accessor = self.sess_to_accessor(trans.session);
318-
let node = self.node.read().unwrap();
319-
attr_encoder
320-
.tw
321-
.start_array(TagType::Context(msg::ReportDataTag::AttributeReports as u8))?;
322-
323-
for attr_path in attr_requests.iter() {
324-
attr_encoder.set_path(attr_path.to_gp());
325-
// Extract the attr_path fields into various structures
326-
attr_details.list_index = attr_path.list_index;
327-
attr_details.fab_idx = accessor.fab_idx;
328-
DataModel::handle_read_attr_path(
329-
&node,
330-
&accessor,
331-
&mut attr_encoder,
332-
&mut attr_details,
333-
);
268+
let mut resume_from = None;
269+
let root = tlv::get_root_node(rx_buf)?;
270+
let req = ReadReq::from_tlv(&root)?;
271+
self.handle_read_req(&req, trans, tw, &mut resume_from)?;
272+
if resume_from.is_some() {
273+
// This is a multi-hop read transaction, remember this read request
274+
let resume = read::ResumeReadReq::new(rx_buf, &resume_from)?;
275+
if !trans.exch.is_data_none() {
276+
error!("Exchange data already set, and multi-hop read");
277+
return Err(Error::InvalidState);
334278
}
335-
tw.end_container()?;
279+
trans.exch.set_data_boxed(Box::new(ResumeReq::Read(resume)));
336280
}
337281
Ok(())
338282
}
@@ -367,61 +311,44 @@ impl InteractionConsumer for DataModel {
367311

368312
Ok(())
369313
}
370-
}
371314

372-
/// Encoder for generating a response to a read request
373-
pub struct AttrReadEncoder<'a, 'b, 'c> {
374-
tw: &'a mut TLVWriter<'b, 'c>,
375-
data_ver: u32,
376-
path: GenericPath,
377-
skip_error: bool,
378-
data_ver_filters: Option<&'a TLVArray<'a, DataVersionFilter>>,
379-
}
315+
fn consume_status_report(
316+
&self,
317+
req: &msg::StatusResp,
318+
trans: &mut Transaction,
319+
tw: &mut TLVWriter,
320+
) -> Result<(OpCode, ResponseRequired), Error> {
321+
if let Some(mut resume) = trans.exch.take_data_boxed::<ResumeReq>() {
322+
let result = match *resume {
323+
ResumeReq::Read(ref mut read) => self.handle_resume_read(read, trans, tw)?,
380324

381-
impl<'a, 'b, 'c> AttrReadEncoder<'a, 'b, 'c> {
382-
pub fn new(tw: &'a mut TLVWriter<'b, 'c>) -> Self {
383-
Self {
384-
tw,
385-
data_ver: 0,
386-
skip_error: false,
387-
path: Default::default(),
388-
data_ver_filters: None,
325+
ResumeReq::Subscribe(ref mut ctx) => ctx.handle_status_report(trans, tw, self)?,
326+
};
327+
trans.exch.set_data_boxed(resume);
328+
Ok(result)
329+
} else {
330+
// Nothing to do for now
331+
trans.complete();
332+
info!("Received status report with status {:?}", req.status);
333+
Ok((OpCode::Reserved, ResponseRequired::No))
389334
}
390335
}
391336

392-
pub fn skip_error(&mut self, skip: bool) {
393-
self.skip_error = skip;
394-
}
395-
396-
pub fn set_data_ver(&mut self, data_ver: u32) {
397-
self.data_ver = data_ver;
398-
}
399-
400-
pub fn set_data_ver_filters(&mut self, filters: &'a TLVArray<'a, DataVersionFilter>) {
401-
self.data_ver_filters = Some(filters);
402-
}
403-
404-
pub fn set_path(&mut self, path: GenericPath) {
405-
self.path = path;
406-
}
407-
}
408-
409-
impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> {
410-
fn encode(&mut self, value: EncodeValue) {
411-
let resp = ib::AttrResp::Data(ib::AttrData::new(
412-
Some(self.data_ver),
413-
ib::AttrPath::new(&self.path),
414-
value,
415-
));
416-
let _ = resp.to_tlv(self.tw, TagType::Anonymous);
417-
}
418-
419-
fn encode_status(&mut self, status: IMStatusCode, cluster_status: u16) {
420-
if !self.skip_error {
421-
let resp =
422-
ib::AttrResp::Status(ib::AttrStatus::new(&self.path, status, cluster_status));
423-
let _ = resp.to_tlv(self.tw, TagType::Anonymous);
337+
fn consume_subscribe(
338+
&self,
339+
rx_buf: &[u8],
340+
trans: &mut Transaction,
341+
tw: &mut TLVWriter,
342+
) -> Result<(OpCode, ResponseRequired), Error> {
343+
if !trans.exch.is_data_none() {
344+
error!("Exchange data already set!");
345+
return Err(Error::InvalidState);
424346
}
347+
let ctx = SubsCtx::new(rx_buf, trans, tw, self)?;
348+
trans
349+
.exch
350+
.set_data_boxed(Box::new(ResumeReq::Subscribe(ctx)));
351+
Ok((OpCode::ReportData, ResponseRequired::Yes))
425352
}
426353
}
427354

0 commit comments

Comments
 (0)