Skip to content

Commit 24cdf07

Browse files
committed
New helper APIs in Transport
1 parent 0d73ba7 commit 24cdf07

File tree

2 files changed

+196
-165
lines changed

2 files changed

+196
-165
lines changed

matter/src/transport/core.rs

Lines changed: 174 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,23 @@
1717

1818
use core::{borrow::Borrow, cell::RefCell};
1919

20-
use crate::{error::ErrorCode, secure_channel::common::OpCode, Matter};
2120
use embassy_futures::select::select;
21+
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, channel::Channel};
2222
use embassy_time::{Duration, Timer};
23-
use log::info;
23+
24+
use log::{error, info, warn};
2425

2526
use crate::{
26-
error::Error, secure_channel::common::PROTO_ID_SECURE_CHANNEL, transport::packet::Packet,
27+
alloc,
28+
data_model::{core::DataModel, objects::DataModelHandler},
29+
error::{Error, ErrorCode},
30+
interaction_model::core::PROTO_ID_INTERACTION_MODEL,
31+
secure_channel::{
32+
common::{OpCode, PROTO_ID_SECURE_CHANNEL},
33+
core::SecureChannel,
34+
},
35+
transport::packet::Packet,
36+
Matter,
2737
};
2838

2939
use super::{
@@ -32,6 +42,8 @@ use super::{
3242
MAX_EXCHANGES,
3343
},
3444
mrp::ReliableMessage,
45+
packet::{MAX_RX_BUF_SIZE, MAX_RX_STATUS_BUF_SIZE, MAX_TX_BUF_SIZE},
46+
pipe::{Chunk, Pipe},
3547
session::SessionMgr,
3648
};
3749

@@ -83,6 +95,165 @@ impl<'a> Transport<'a> {
8395
unimplemented!()
8496
}
8597

98+
#[inline(always)]
99+
pub async fn handle_tx(&self, tx_pipe: &Pipe<'_>) -> Result<(), Error> {
100+
loop {
101+
loop {
102+
{
103+
let mut data = tx_pipe.data.lock().await;
104+
105+
if data.chunk.is_none() {
106+
let mut tx = alloc!(Packet::new_tx(data.buf));
107+
108+
if self.pull_tx(&mut tx).await? {
109+
data.chunk = Some(Chunk {
110+
start: tx.get_writebuf()?.get_start(),
111+
end: tx.get_writebuf()?.get_tail(),
112+
addr: tx.peer,
113+
});
114+
tx_pipe.data_supplied_notification.signal(());
115+
} else {
116+
break;
117+
}
118+
}
119+
}
120+
121+
tx_pipe.data_consumed_notification.wait().await;
122+
}
123+
124+
self.wait_tx().await?;
125+
}
126+
}
127+
128+
#[inline(always)]
129+
pub async fn handle_rx_multiplex<'t, 'e, const N: usize>(
130+
&'t self,
131+
rx_pipe: &Pipe<'_>,
132+
construction_notification: &'e Notification,
133+
channel: &Channel<NoopRawMutex, ExchangeCtr<'e>, N>,
134+
) -> Result<(), Error>
135+
where
136+
't: 'e,
137+
{
138+
loop {
139+
info!("Transport: waiting for incoming packets");
140+
141+
{
142+
let mut data = rx_pipe.data.lock().await;
143+
144+
if let Some(chunk) = data.chunk {
145+
let mut rx = alloc!(Packet::new_rx(&mut data.buf[chunk.start..chunk.end]));
146+
rx.peer = chunk.addr;
147+
148+
if let Some(exchange_ctr) =
149+
self.process_rx(construction_notification, &mut rx)?
150+
{
151+
let exchange_id = exchange_ctr.id().clone();
152+
153+
info!("Transport: got new exchange: {:?}", exchange_id);
154+
155+
channel.send(exchange_ctr).await;
156+
info!("Transport: exchange sent");
157+
158+
self.wait_construction(construction_notification, &rx, &exchange_id)
159+
.await?;
160+
161+
info!("Transport: exchange started");
162+
}
163+
164+
data.chunk = None;
165+
rx_pipe.data_consumed_notification.signal(());
166+
}
167+
}
168+
169+
rx_pipe.data_supplied_notification.wait().await
170+
}
171+
172+
#[allow(unreachable_code)]
173+
Ok::<_, Error>(())
174+
}
175+
176+
#[inline(always)]
177+
pub async fn exchange_handler<const N: usize, H>(
178+
&self,
179+
tx_buf: &mut [u8; MAX_TX_BUF_SIZE],
180+
rx_buf: &mut [u8; MAX_RX_BUF_SIZE],
181+
sx_buf: &mut [u8; MAX_RX_STATUS_BUF_SIZE],
182+
handler_id: impl core::fmt::Display,
183+
channel: &Channel<NoopRawMutex, ExchangeCtr<'_>, N>,
184+
handler: &H,
185+
) -> Result<(), Error>
186+
where
187+
H: DataModelHandler,
188+
{
189+
loop {
190+
let exchange_ctr: ExchangeCtr<'_> = channel.recv().await;
191+
192+
info!(
193+
"Handler {}: Got exchange {:?}",
194+
handler_id,
195+
exchange_ctr.id()
196+
);
197+
198+
let result = self
199+
.handle_exchange(tx_buf, rx_buf, sx_buf, exchange_ctr, handler)
200+
.await;
201+
202+
if let Err(err) = result {
203+
warn!(
204+
"Handler {}: Exchange closed because of error: {:?}",
205+
handler_id, err
206+
);
207+
} else {
208+
info!("Handler {}: Exchange completed", handler_id);
209+
}
210+
}
211+
}
212+
213+
#[inline(always)]
214+
#[cfg_attr(feature = "nightly", allow(clippy::await_holding_refcell_ref))] // Fine because of the async mutex
215+
pub async fn handle_exchange<H>(
216+
&self,
217+
tx_buf: &mut [u8; MAX_TX_BUF_SIZE],
218+
rx_buf: &mut [u8; MAX_RX_BUF_SIZE],
219+
sx_buf: &mut [u8; MAX_RX_STATUS_BUF_SIZE],
220+
exchange_ctr: ExchangeCtr<'_>,
221+
handler: &H,
222+
) -> Result<(), Error>
223+
where
224+
H: DataModelHandler,
225+
{
226+
let mut tx = alloc!(Packet::new_tx(tx_buf.as_mut()));
227+
let mut rx = alloc!(Packet::new_rx(rx_buf.as_mut()));
228+
229+
let mut exchange = alloc!(exchange_ctr.get(&mut rx).await?);
230+
231+
match rx.get_proto_id() {
232+
PROTO_ID_SECURE_CHANNEL => {
233+
let sc = SecureChannel::new(self.matter());
234+
235+
sc.handle(&mut exchange, &mut rx, &mut tx).await?;
236+
237+
self.matter().notify_changed();
238+
}
239+
PROTO_ID_INTERACTION_MODEL => {
240+
let dm = DataModel::new(handler);
241+
242+
let mut rx_status = alloc!(Packet::new_rx(sx_buf));
243+
244+
dm.handle(&mut exchange, &mut rx, &mut tx, &mut rx_status)
245+
.await?;
246+
247+
self.matter().notify_changed();
248+
}
249+
other => {
250+
error!("Unknown Proto-ID: {}", other);
251+
}
252+
}
253+
254+
Ok(())
255+
}
256+
86257
pub fn process_rx<'r>(
87258
&'r self,
88259
construction_notification: &'r Notification,

0 commit comments

Comments
 (0)