17
17
18
18
use core:: { borrow:: Borrow , cell:: RefCell } ;
19
19
20
- use crate :: { error:: ErrorCode , secure_channel:: common:: OpCode , Matter } ;
21
20
use embassy_futures:: select:: select;
21
+ use embassy_sync:: { blocking_mutex:: raw:: NoopRawMutex , channel:: Channel } ;
22
22
use embassy_time:: { Duration , Timer } ;
23
- use log:: info;
23
+
24
+ use log:: { error, info, warn} ;
24
25
25
26
use crate :: {
26
- error:: Error , secure_channel:: common:: PROTO_ID_SECURE_CHANNEL , transport:: packet:: Packet ,
27
+ alloc,
28
+ data_model:: { core:: DataModel , objects:: DataModelHandler } ,
29
+ error:: { Error , ErrorCode } ,
30
+ interaction_model:: core:: PROTO_ID_INTERACTION_MODEL ,
31
+ secure_channel:: {
32
+ common:: { OpCode , PROTO_ID_SECURE_CHANNEL } ,
33
+ core:: SecureChannel ,
34
+ } ,
35
+ transport:: packet:: Packet ,
36
+ Matter ,
27
37
} ;
28
38
29
39
use super :: {
@@ -32,6 +42,8 @@ use super::{
32
42
MAX_EXCHANGES ,
33
43
} ,
34
44
mrp:: ReliableMessage ,
45
+ packet:: { MAX_RX_BUF_SIZE , MAX_RX_STATUS_BUF_SIZE , MAX_TX_BUF_SIZE } ,
46
+ pipe:: { Chunk , Pipe } ,
35
47
session:: SessionMgr ,
36
48
} ;
37
49
@@ -83,6 +95,165 @@ impl<'a> Transport<'a> {
83
95
unimplemented ! ( )
84
96
}
85
97
98
+ #[ inline( always) ]
99
+ pub async fn handle_tx ( & self , tx_pipe : & Pipe < ' _ > ) -> Result < ( ) , Error > {
100
+ loop {
101
+ loop {
102
+ {
103
+ let mut data = tx_pipe. data . lock ( ) . await ;
104
+
105
+ if data. chunk . is_none ( ) {
106
+ let mut tx = alloc ! ( Packet :: new_tx( data. buf) ) ;
107
+
108
+ if self . pull_tx ( & mut tx) . await ? {
109
+ data. chunk = Some ( Chunk {
110
+ start : tx. get_writebuf ( ) ?. get_start ( ) ,
111
+ end : tx. get_writebuf ( ) ?. get_tail ( ) ,
112
+ addr : tx. peer ,
113
+ } ) ;
114
+ tx_pipe. data_supplied_notification . signal ( ( ) ) ;
115
+ } else {
116
+ break ;
117
+ }
118
+ }
119
+ }
120
+
121
+ tx_pipe. data_consumed_notification . wait ( ) . await ;
122
+ }
123
+
124
+ self . wait_tx ( ) . await ?;
125
+ }
126
+ }
127
+
128
+ #[ inline( always) ]
129
+ pub async fn handle_rx_multiplex < ' t , ' e , const N : usize > (
130
+ & ' t self ,
131
+ rx_pipe : & Pipe < ' _ > ,
132
+ construction_notification : & ' e Notification ,
133
+ channel : & Channel < NoopRawMutex , ExchangeCtr < ' e > , N > ,
134
+ ) -> Result < ( ) , Error >
135
+ where
136
+ ' t : ' e ,
137
+ {
138
+ loop {
139
+ info ! ( "Transport: waiting for incoming packets" ) ;
140
+
141
+ {
142
+ let mut data = rx_pipe. data . lock ( ) . await ;
143
+
144
+ if let Some ( chunk) = data. chunk {
145
+ let mut rx = alloc ! ( Packet :: new_rx( & mut data. buf[ chunk. start..chunk. end] ) ) ;
146
+ rx. peer = chunk. addr ;
147
+
148
+ if let Some ( exchange_ctr) =
149
+ self . process_rx ( construction_notification, & mut rx) ?
150
+ {
151
+ let exchange_id = exchange_ctr. id ( ) . clone ( ) ;
152
+
153
+ info ! ( "Transport: got new exchange: {:?}" , exchange_id) ;
154
+
155
+ channel. send ( exchange_ctr) . await ;
156
+ info ! ( "Transport: exchange sent" ) ;
157
+
158
+ self . wait_construction ( construction_notification, & rx, & exchange_id)
159
+ . await ?;
160
+
161
+ info ! ( "Transport: exchange started" ) ;
162
+ }
163
+
164
+ data. chunk = None ;
165
+ rx_pipe. data_consumed_notification . signal ( ( ) ) ;
166
+ }
167
+ }
168
+
169
+ rx_pipe. data_supplied_notification . wait ( ) . await
170
+ }
171
+
172
+ #[ allow( unreachable_code) ]
173
+ Ok :: < _ , Error > ( ( ) )
174
+ }
175
+
176
+ #[ inline( always) ]
177
+ pub async fn exchange_handler < const N : usize , H > (
178
+ & self ,
179
+ tx_buf : & mut [ u8 ; MAX_TX_BUF_SIZE ] ,
180
+ rx_buf : & mut [ u8 ; MAX_RX_BUF_SIZE ] ,
181
+ sx_buf : & mut [ u8 ; MAX_RX_STATUS_BUF_SIZE ] ,
182
+ handler_id : impl core:: fmt:: Display ,
183
+ channel : & Channel < NoopRawMutex , ExchangeCtr < ' _ > , N > ,
184
+ handler : & H ,
185
+ ) -> Result < ( ) , Error >
186
+ where
187
+ H : DataModelHandler ,
188
+ {
189
+ loop {
190
+ let exchange_ctr: ExchangeCtr < ' _ > = channel. recv ( ) . await ;
191
+
192
+ info ! (
193
+ "Handler {}: Got exchange {:?}" ,
194
+ handler_id,
195
+ exchange_ctr. id( )
196
+ ) ;
197
+
198
+ let result = self
199
+ . handle_exchange ( tx_buf, rx_buf, sx_buf, exchange_ctr, handler)
200
+ . await ;
201
+
202
+ if let Err ( err) = result {
203
+ warn ! (
204
+ "Handler {}: Exchange closed because of error: {:?}" ,
205
+ handler_id, err
206
+ ) ;
207
+ } else {
208
+ info ! ( "Handler {}: Exchange completed" , handler_id) ;
209
+ }
210
+ }
211
+ }
212
+
213
+ #[ inline( always) ]
214
+ #[ cfg_attr( feature = "nightly" , allow( clippy:: await_holding_refcell_ref) ) ] // Fine because of the async mutex
215
+ pub async fn handle_exchange < H > (
216
+ & self ,
217
+ tx_buf : & mut [ u8 ; MAX_TX_BUF_SIZE ] ,
218
+ rx_buf : & mut [ u8 ; MAX_RX_BUF_SIZE ] ,
219
+ sx_buf : & mut [ u8 ; MAX_RX_STATUS_BUF_SIZE ] ,
220
+ exchange_ctr : ExchangeCtr < ' _ > ,
221
+ handler : & H ,
222
+ ) -> Result < ( ) , Error >
223
+ where
224
+ H : DataModelHandler ,
225
+ {
226
+ let mut tx = alloc ! ( Packet :: new_tx( tx_buf. as_mut( ) ) ) ;
227
+ let mut rx = alloc ! ( Packet :: new_rx( rx_buf. as_mut( ) ) ) ;
228
+
229
+ let mut exchange = alloc ! ( exchange_ctr. get( & mut rx) . await ?) ;
230
+
231
+ match rx. get_proto_id ( ) {
232
+ PROTO_ID_SECURE_CHANNEL => {
233
+ let sc = SecureChannel :: new ( self . matter ( ) ) ;
234
+
235
+ sc. handle ( & mut exchange, & mut rx, & mut tx) . await ?;
236
+
237
+ self . matter ( ) . notify_changed ( ) ;
238
+ }
239
+ PROTO_ID_INTERACTION_MODEL => {
240
+ let dm = DataModel :: new ( handler) ;
241
+
242
+ let mut rx_status = alloc ! ( Packet :: new_rx( sx_buf) ) ;
243
+
244
+ dm. handle ( & mut exchange, & mut rx, & mut tx, & mut rx_status)
245
+ . await ?;
246
+
247
+ self . matter ( ) . notify_changed ( ) ;
248
+ }
249
+ other => {
250
+ error ! ( "Unknown Proto-ID: {}" , other) ;
251
+ }
252
+ }
253
+
254
+ Ok ( ( ) )
255
+ }
256
+
86
257
pub fn process_rx < ' r > (
87
258
& ' r self ,
88
259
construction_notification : & ' r Notification ,
0 commit comments