Skip to content

Commit d0d853d

Browse files
committed
Subscribe: Support for long-read based subscription
The following command works with this: chip-tool any subscribe-by-id 0xFFFFFFFF 0xFFFFFFFF 1 20 12344321 0xFFFF
1 parent 78d1462 commit d0d853d

File tree

5 files changed

+98
-49
lines changed

5 files changed

+98
-49
lines changed

matter/src/data_model/core/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18+
use self::subscribe::SubsCtx;
19+
1820
use super::{
1921
cluster_basic_information::BasicInfoConfig,
2022
device_types::device_type_add_root_node,
@@ -31,7 +33,7 @@ use crate::{
3133
core::{IMStatusCode, OpCode},
3234
messages::{
3335
ib::{self, AttrData, DataVersionFilter},
34-
msg::{self, InvReq, ReadReq, SubscribeReq, WriteReq},
36+
msg::{self, InvReq, ReadReq, WriteReq},
3537
GenericPath,
3638
},
3739
InteractionConsumer, Transaction,
@@ -320,9 +322,7 @@ impl InteractionConsumer for DataModel {
320322
let result = match *resume {
321323
ResumeReq::Read(ref mut read) => self.handle_resume_read(read, trans, tw)?,
322324

323-
ResumeReq::Subscribe(mut ctx) => {
324-
self.handle_subscription_confirm(trans, tw, &mut ctx)?
325-
}
325+
ResumeReq::Subscribe(ref mut ctx) => ctx.handle_status_report(trans, tw, self)?,
326326
};
327327
trans.exch.set_data_boxed(resume);
328328
Ok(result)
@@ -336,15 +336,15 @@ impl InteractionConsumer for DataModel {
336336

337337
fn consume_subscribe(
338338
&self,
339-
req: &SubscribeReq,
339+
rx_buf: &[u8],
340340
trans: &mut Transaction,
341341
tw: &mut TLVWriter,
342342
) -> Result<(OpCode, ResponseRequired), Error> {
343343
if !trans.exch.is_data_none() {
344344
error!("Exchange data already set!");
345345
return Err(Error::InvalidState);
346346
}
347-
let ctx = self.handle_subscribe_req(req, trans, tw)?;
347+
let ctx = SubsCtx::new(rx_buf, trans, tw, self)?;
348348
trans
349349
.exch
350350
.set_data_boxed(Box::new(ResumeReq::Subscribe(ctx)));

matter/src/data_model/core/read.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::{
3434
wb_shrink, wb_unshrink,
3535
};
3636
use log::error;
37+
3738
/// Encoder for generating a response to a read request
3839
pub struct AttrReadEncoder<'a, 'b, 'c> {
3940
tw: &'a mut TLVWriter<'b, 'c>,
@@ -107,13 +108,13 @@ impl<'a, 'b, 'c> Encoder for AttrReadEncoder<'a, 'b, 'c> {
107108
pub struct ResumeReadReq {
108109
/// The Read Request Attribute Path that caused chunking, and this is the path
109110
/// that needs to be resumed.
110-
pending_req: Option<Packet<'static>>,
111+
pub pending_req: Option<Packet<'static>>,
111112

112113
/// The Attribute that couldn't be encoded because our buffer got full. The next chunk
113114
/// will start encoding from this attribute onwards.
114115
/// Note that given wildcard reads, one PendingPath in the member above can generated
115116
/// multiple encode paths. Hence this has to be maintained separately.
116-
resume_from: Option<GenericPath>,
117+
pub resume_from: Option<GenericPath>,
117118
}
118119
impl ResumeReadReq {
119120
pub fn new(rx_buf: &[u8], resume_from: &Option<GenericPath>) -> Result<Self, Error> {

matter/src/data_model/core/subscribe.rs

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,71 +21,122 @@ use crate::{
2121
error::Error,
2222
interaction_model::{
2323
core::OpCode,
24-
messages::msg::{self, SubscribeReq, SubscribeResp},
24+
messages::{
25+
msg::{self, SubscribeReq, SubscribeResp},
26+
GenericPath,
27+
},
2528
},
26-
tlv::{TLVWriter, TagType, ToTLV},
29+
tlv::{self, get_root_node_struct, FromTLV, TLVWriter, TagType, ToTLV},
2730
transport::proto_demux::ResponseRequired,
2831
};
2932

30-
use super::{DataModel, Transaction};
33+
use super::{read::ResumeReadReq, DataModel, Transaction};
3134

3235
static SUBS_ID: AtomicU32 = AtomicU32::new(1);
3336

34-
impl DataModel {
35-
pub fn handle_subscribe_req(
36-
&self,
37-
req: &SubscribeReq,
37+
#[derive(PartialEq)]
38+
enum SubsState {
39+
Confirming,
40+
Confirmed,
41+
}
42+
43+
pub struct SubsCtx {
44+
state: SubsState,
45+
id: u32,
46+
resume_read_req: Option<ResumeReadReq>,
47+
}
48+
49+
impl SubsCtx {
50+
pub fn new(
51+
rx_buf: &[u8],
3852
trans: &mut Transaction,
3953
tw: &mut TLVWriter,
40-
) -> Result<SubsCtx, Error> {
41-
let ctx = SubsCtx {
54+
dm: &DataModel,
55+
) -> Result<Self, Error> {
56+
let root = get_root_node_struct(rx_buf)?;
57+
let req = SubscribeReq::from_tlv(&root)?;
58+
59+
let mut ctx = SubsCtx {
4260
state: SubsState::Confirming,
4361
// TODO
4462
id: SUBS_ID.fetch_add(1, Ordering::SeqCst),
63+
resume_read_req: None,
4564
};
4665

47-
let read_req = req.to_read_req();
48-
tw.start_struct(TagType::Anonymous)?;
49-
tw.u32(
50-
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
51-
ctx.id,
52-
)?;
5366
let mut resume_from = None;
54-
self.handle_read_attr_array(&read_req, trans, tw, &mut resume_from)?;
55-
tw.end_container()?;
56-
67+
ctx.do_read(&req, trans, tw, dm, &mut resume_from)?;
68+
if resume_from.is_some() {
69+
// This is a multi-hop read transaction, remember this read request
70+
ctx.resume_read_req = Some(ResumeReadReq::new(rx_buf, &resume_from)?);
71+
}
5772
Ok(ctx)
5873
}
5974

60-
pub fn handle_subscription_confirm(
61-
&self,
75+
pub fn handle_status_report(
76+
&mut self,
6277
trans: &mut Transaction,
6378
tw: &mut TLVWriter,
64-
ctx: &mut SubsCtx,
79+
dm: &DataModel,
6580
) -> Result<(OpCode, ResponseRequired), Error> {
66-
if ctx.state != SubsState::Confirming {
81+
if self.state != SubsState::Confirming {
6782
// Not relevant for us
6883
trans.complete();
6984
return Err(Error::Invalid);
7085
}
71-
ctx.state = SubsState::Confirmed;
86+
87+
// Is there a previous resume read pending
88+
if self.resume_read_req.is_some() {
89+
let mut resume_read_req = self.resume_read_req.take().unwrap();
90+
if let Some(packet) = resume_read_req.pending_req.as_mut() {
91+
let rx_buf = packet.get_parsebuf()?.as_borrow_slice();
92+
let root = tlv::get_root_node(rx_buf)?;
93+
let req = SubscribeReq::from_tlv(&root)?;
94+
95+
self.do_read(&req, trans, tw, dm, &mut resume_read_req.resume_from)?;
96+
if resume_read_req.resume_from.is_some() {
97+
// More chunks are pending, setup resume_read_req again
98+
self.resume_read_req = Some(resume_read_req);
99+
}
100+
101+
return Ok((OpCode::ReportData, ResponseRequired::Yes));
102+
}
103+
}
104+
105+
// We are here implies that the read is now complete
106+
self.confirm_subscription(trans, tw)
107+
}
108+
109+
fn confirm_subscription(
110+
&mut self,
111+
trans: &mut Transaction,
112+
tw: &mut TLVWriter,
113+
) -> Result<(OpCode, ResponseRequired), Error> {
114+
self.state = SubsState::Confirmed;
72115

73116
// TODO
74-
let resp = SubscribeResp::new(ctx.id, 40);
117+
let resp = SubscribeResp::new(self.id, 40);
75118
resp.to_tlv(tw, TagType::Anonymous)?;
76119
trans.complete();
77120
Ok((OpCode::SubscriptResponse, ResponseRequired::Yes))
78121
}
79-
}
80122

81-
#[derive(PartialEq, Clone, Copy)]
82-
enum SubsState {
83-
Confirming,
84-
Confirmed,
85-
}
123+
fn do_read(
124+
&mut self,
125+
req: &SubscribeReq,
126+
trans: &mut Transaction,
127+
tw: &mut TLVWriter,
128+
dm: &DataModel,
129+
resume_from: &mut Option<GenericPath>,
130+
) -> Result<(), Error> {
131+
let read_req = req.to_read_req();
132+
tw.start_struct(TagType::Anonymous)?;
133+
tw.u32(
134+
TagType::Context(msg::ReportDataTag::SubscriptionId as u8),
135+
self.id,
136+
)?;
137+
dm.handle_read_attr_array(&read_req, trans, tw, resume_from)?;
138+
tw.end_container()?;
86139

87-
#[derive(Clone, Copy)]
88-
pub struct SubsCtx {
89-
state: SubsState,
90-
id: u32,
140+
Ok(())
141+
}
91142
}

matter/src/interaction_model/core.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ use log::{error, info};
3333
use num;
3434
use num_derive::FromPrimitive;
3535

36+
use super::InteractionModel;
3637
use super::Transaction;
3738
use super::TransactionState;
38-
use super::{messages::msg::SubscribeReq, InteractionModel};
3939
use super::{messages::msg::TimedReq, InteractionConsumer};
4040

4141
/* Handle messages related to the Interation Model
@@ -107,10 +107,7 @@ impl InteractionModel {
107107
proto_tx: &mut Packet,
108108
) -> Result<ResponseRequired, Error> {
109109
let mut tw = TLVWriter::new(proto_tx.get_writebuf()?);
110-
let root = get_root_node_struct(rx_buf)?;
111-
let req = SubscribeReq::from_tlv(&root)?;
112-
113-
let (opcode, resp) = self.consumer.consume_subscribe(&req, trans, &mut tw)?;
110+
let (opcode, resp) = self.consumer.consume_subscribe(rx_buf, trans, &mut tw)?;
114111
proto_tx.set_proto_opcode(opcode as u8);
115112
Ok(resp)
116113
}

matter/src/interaction_model/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323

2424
use self::{
2525
core::OpCode,
26-
messages::msg::{InvReq, StatusResp, SubscribeReq, WriteReq},
26+
messages::msg::{InvReq, StatusResp, WriteReq},
2727
};
2828

2929
#[derive(PartialEq)]
@@ -70,7 +70,7 @@ pub trait InteractionConsumer {
7070

7171
fn consume_subscribe(
7272
&self,
73-
_req: &SubscribeReq,
73+
_req: &[u8],
7474
_trans: &mut Transaction,
7575
_tw: &mut TLVWriter,
7676
) -> Result<(OpCode, ResponseRequired), Error>;

0 commit comments

Comments
 (0)