Skip to content

Commit b869c29

Browse files
committed
refactor CFU to use deferred IPC channel
This is a BREAKING CHANGE which updates embedded-service::cfu's CfuDevice from utilizing a dual-channel model for communication to a deferred IPC channel. Cfu-service, type-c service, and the examples were updated to reflect this change.
1 parent 2b82aaf commit b869c29

File tree

9 files changed

+249
-251
lines changed

9 files changed

+249
-251
lines changed

cfu-service/src/buffer.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use embedded_cfu_protocol::protocol_definitions::*;
1010
use embedded_services::{
1111
cfu::{
1212
self,
13-
component::{CfuDevice, InternalResponseData, RequestData},
13+
component::{CfuDevice, InternalResponse, InternalResponseData, RequestData},
1414
},
15-
error, intrusive_list, trace,
15+
error, intrusive_list,
16+
ipc::deferred::RequestId,
17+
trace,
1618
};
1719

1820
/// Internal state for [`Buffer`]
@@ -148,24 +150,37 @@ impl<'a> Buffer<'a> {
148150
state.pending_response = None;
149151
}
150152

153+
let mut have_id = false;
154+
let mut req_id: RequestId = RequestId::default();
151155
if state.component_busy {
152156
// Buffer the content if the component is busy
153157
// If the buffer is full, this will block until space is available
154158
trace!("Component is busy, buffering content");
155159
self.buffer_sender.send(*content).await;
156160
} else {
157161
// Buffered component can accept new content, send it
158-
if let Err(e) = cfu::send_device_request(self.buffered_id, RequestData::GiveContent(*content)).await {
159-
error!(
160-
"Failed to send content to buffered component {:?}: {:?}",
161-
self.buffered_id, e
162-
);
163-
return Self::create_content_rejection(content.header.sequence_num);
162+
match cfu::send_device_request(self.buffered_id, RequestData::GiveContent(*content)).await {
163+
Ok(id) => {
164+
have_id = true;
165+
req_id = id;
166+
}
167+
Err(e) => {
168+
error!(
169+
"Failed to send content to buffered component {:?}: {:?}",
170+
self.buffered_id, e
171+
);
172+
return Self::create_content_rejection(content.header.sequence_num);
173+
}
164174
}
165175
}
166176

167177
// Wait for a response from the buffered component
168-
match with_timeout(self.config.buffer_timeout, cfu::wait_device_response(self.buffered_id)).await {
178+
let fut = if have_id {
179+
cfu::wait_device_response(self.buffered_id, Some(req_id))
180+
} else {
181+
cfu::wait_device_response(self.buffered_id, None)
182+
};
183+
match with_timeout(self.config.buffer_timeout, fut).await {
169184
Err(TimeoutError) => {
170185
// Component didn't respond in time
171186
state.component_busy = true;
@@ -195,7 +210,7 @@ impl<'a> Buffer<'a> {
195210
Ok(response) => {
196211
trace!("Buffered component responded");
197212
state.component_busy = false;
198-
match response {
213+
match response.unwrap() {
199214
Ok(InternalResponseData::ContentResponse(mut response)) => {
200215
response.sequence = content.header.sequence_num;
201216
InternalResponseData::ContentResponse(response)
@@ -233,9 +248,9 @@ impl<'a> Buffer<'a> {
233248
// Wait for a buffered content request
234249
self.wait_buffered_content(is_busy),
235250
// Wait for a request from the host
236-
self.cfu_device.wait_request(),
251+
self.cfu_device.receive(),
237252
// Wait for response from the buffered component
238-
cfu::wait_device_response(self.buffered_id),
253+
cfu::wait_device_response(self.buffered_id, None),
239254
)
240255
.await
241256
{
@@ -244,11 +259,11 @@ impl<'a> Buffer<'a> {
244259
Event::BufferedContent(content)
245260
}
246261
Either3::Second(request) => {
247-
trace!("Request received: {:?}", request);
248-
Event::CfuRequest(request)
262+
trace!("Request received: {:?}", request.command);
263+
Event::CfuRequest(request.command.data)
249264
}
250265
Either3::Third(response) => {
251-
if let Ok(response) = response {
266+
if let Ok(Ok(response)) = response {
252267
trace!("Response received: {:?}", response);
253268
Event::ComponentResponse(response)
254269
} else {
@@ -315,8 +330,8 @@ impl<'a> Buffer<'a> {
315330
}
316331

317332
/// Send a response to the CFU message
318-
pub async fn send_response(&self, response: InternalResponseData) {
319-
self.cfu_device.send_response(response).await;
333+
pub async fn send_response(&self, response: InternalResponse, request_id: RequestId) {
334+
self.cfu_device.send_response(request_id, response).await
320335
}
321336

322337
/// Register the buffer with all relevant services

cfu-service/src/lib.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,16 @@ impl CfuClient {
2828
tp: comms::Endpoint::uninit(comms::EndpointID::Internal(comms::Internal::Nonvol)),
2929
})
3030
}
31-
pub async fn process_request(&self) -> Result<(), CfuError> {
32-
let request = self.context.wait_request().await;
31+
pub async fn process_request(&self) -> InternalResponse {
32+
let request = self.context.receive().await;
3333
//let device = self.context.get_device(request.id).await?;
34-
let comp = request.id;
34+
let comp = request.command.id;
3535

36-
match request.data {
36+
match request.command.data {
3737
RequestData::FwVersionRequest => {
3838
info!("Received FwVersionRequest, comp {}", comp);
3939
if let Ok(device) = self.context.get_device(comp).await {
40-
let resp = device
41-
.execute_device_request(request.data)
42-
.await
43-
.map_err(CfuError::ProtocolError)?;
40+
let resp = device.execute_device_request(request.command).await?;
4441

4542
// TODO replace with signal to component to get its own fw version
4643
//cfu::send_request(comp, RequestData::FwVersionRequest).await?;
@@ -54,15 +51,18 @@ impl CfuClient {
5451
return Err(CfuError::ProtocolError(CfuProtocolError::BadResponse));
5552
}
5653
}
57-
self.context.send_response(resp).await;
58-
return Ok(());
54+
return Ok(resp);
5955
}
6056
Err(CfuError::InvalidComponent)
6157
}
62-
RequestData::GiveContent(_content_cmd) => Ok(()),
63-
RequestData::GiveOffer(_offer_cmd) => Ok(()),
64-
RequestData::PrepareComponentForUpdate => Ok(()),
65-
RequestData::FinalizeUpdate => Ok(()),
58+
RequestData::GiveContent(_content_cmd) => {
59+
Ok(InternalResponseData::ContentResponse(FwUpdateContentResponse::default()))
60+
}
61+
RequestData::GiveOffer(_offer_cmd) => {
62+
Ok(InternalResponseData::OfferResponse(FwUpdateOfferResponse::default()))
63+
}
64+
RequestData::PrepareComponentForUpdate => Ok(InternalResponseData::ComponentPrepared),
65+
RequestData::FinalizeUpdate => Ok(InternalResponseData::ComponentPrepared),
6666
}
6767
}
6868
}

cfu-service/src/splitter.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ use embedded_cfu_protocol::protocol_definitions::*;
88
use embedded_services::{
99
cfu::{
1010
self,
11-
component::{CfuDevice, InternalResponseData, RequestData},
11+
component::{CfuDevice, InternalResponseData, Request, RequestData},
1212
},
13-
error, intrusive_list, trace,
13+
error, intrusive_list,
14+
ipc::deferred,
15+
trace,
1416
};
1517

1618
/// Trait containing customization functionality for [`Splitter`]
@@ -156,8 +158,8 @@ impl<'a, C: Customization> Splitter<'a, C> {
156158
}
157159

158160
/// Wait for a CFU message
159-
pub async fn wait_request(&self) -> RequestData {
160-
self.cfu_device.wait_request().await
161+
pub async fn wait_request(&self) -> Request {
162+
self.cfu_device.receive().await.command
161163
}
162164

163165
/// Process a CFU message and produce a response
@@ -187,8 +189,8 @@ impl<'a, C: Customization> Splitter<'a, C> {
187189
}
188190

189191
/// Send a response to the CFU message
190-
pub async fn send_response(&self, response: InternalResponseData) {
191-
self.cfu_device.send_response(response).await;
192+
pub async fn send_response(&self, response: InternalResponseData, request_id: deferred::RequestId) {
193+
self.cfu_device.send_response(request_id, Ok(response)).await;
192194
}
193195

194196
pub async fn register(&'static self) -> Result<(), intrusive_list::Error> {

embedded-service/src/cfu/component.rs

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
//! Device struct and methods for component communication
22
use core::future::Future;
33

4+
#[cfg(feature = "defmt")]
5+
use defmt::error;
6+
47
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
5-
use embassy_sync::channel::Channel;
68
use embassy_sync::mutex::Mutex;
79
use embedded_cfu_protocol::components::{CfuComponentInfo, CfuComponentStorage, CfuComponentTraits};
810
use embedded_cfu_protocol::protocol_definitions::*;
@@ -12,6 +14,7 @@ use heapless::Vec;
1214
use super::CfuError;
1315
use crate::cfu::route_request;
1416
use crate::intrusive_list;
17+
use crate::ipc::deferred;
1518

1619
/// Component internal update state
1720
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -57,6 +60,16 @@ impl Default for InternalState {
5760
}
5861
}
5962

63+
/// Request to the cfu service
64+
#[derive(Debug, PartialEq)]
65+
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
66+
pub struct Request {
67+
/// Component that sent this request
68+
pub id: ComponentId,
69+
/// Request data
70+
pub data: RequestData,
71+
}
72+
6073
/// CFU Request types and necessary data
6174
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6275
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
@@ -93,17 +106,16 @@ pub enum InternalResponseData {
93106
ComponentPrepared,
94107
}
95108

96-
/// Channel size for device requests
97-
pub const DEVICE_CHANNEL_SIZE: usize = 1;
109+
/// Wrapper type to make code cleaner
110+
pub type InternalResponse = Result<InternalResponseData, CfuError>;
98111

99112
/// CfuDevice struct
100113
/// Can be inserted in an intrusive-list+
101114
pub struct CfuDevice {
102115
node: intrusive_list::Node,
103116
component_id: ComponentId,
104117
state: Mutex<NoopRawMutex, InternalState>,
105-
request: Channel<NoopRawMutex, RequestData, DEVICE_CHANNEL_SIZE>,
106-
response: Channel<NoopRawMutex, InternalResponseData, DEVICE_CHANNEL_SIZE>,
118+
request: deferred::Channel<NoopRawMutex, Request, InternalResponse>,
107119
}
108120

109121
impl intrusive_list::NodeContainer for CfuDevice {
@@ -124,15 +136,17 @@ impl CfuDeviceContainer for CfuDevice {
124136
}
125137
}
126138

139+
/// Convenience type for CFU request
140+
pub type CfuRequest<'a> = deferred::Request<'a, NoopRawMutex, Request, InternalResponse>;
141+
127142
impl CfuDevice {
128143
/// Constructor for CfuDevice
129144
pub fn new(component_id: ComponentId) -> Self {
130145
Self {
131146
node: intrusive_list::Node::uninit(),
132147
component_id,
133148
state: Mutex::new(InternalState::default()),
134-
request: Channel::new(),
135-
response: Channel::new(),
149+
request: deferred::Channel::new(),
136150
}
137151
}
138152
/// Getter for component id
@@ -145,30 +159,34 @@ impl CfuDevice {
145159
*self.state.lock().await
146160
}
147161

148-
/// Send a request to this device
149-
pub async fn send_request(&self, request: RequestData) {
150-
self.request.send(request).await;
151-
}
152-
153162
/// Sends a request to this device and returns a response
154-
pub async fn execute_device_request(&self, request: RequestData) -> Result<InternalResponseData, CfuProtocolError> {
155-
self.send_request(request).await;
156-
Ok(self.wait_response().await)
163+
pub async fn execute_device_request(&self, request: Request) -> InternalResponse {
164+
self.request.execute(request).await
157165
}
158166

159-
/// Wait for a request
160-
pub async fn wait_request(&self) -> RequestData {
167+
/// Send a response
168+
pub async fn receive(&self) -> CfuRequest {
161169
self.request.receive().await
162170
}
163171

164-
/// Send a response
165-
pub async fn send_response(&self, response: InternalResponseData) {
166-
self.response.send(response).await;
172+
/// Send a request to the device, returns a request ID for later tracking if needed
173+
pub async fn send_request(&self, request: Request) -> deferred::RequestId {
174+
self.request.send_command(request).await
175+
}
176+
177+
/// Wait for a response for a specific request ID
178+
pub async fn wait_response(&self, id: deferred::RequestId) -> InternalResponse {
179+
self.request.wait_for_response(id).await
167180
}
168181

169-
/// Waits for a response
170-
pub async fn wait_response(&self) -> InternalResponseData {
171-
self.response.receive().await
182+
/// Wait for the next response, regardless of request ID
183+
pub async fn wait_any_response(&self) -> InternalResponse {
184+
self.request.wait_for_next_response().await
185+
}
186+
187+
/// Send a response
188+
pub async fn send_response(&self, id: deferred::RequestId, response: InternalResponse) {
189+
self.request.send_response(response, id).await
172190
}
173191
}
174192

@@ -213,7 +231,8 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
213231
}
214232
/// wait for a request and process it
215233
pub async fn process_request(&self) -> Result<(), CfuError> {
216-
match self.device.wait_request().await {
234+
let request = self.device.receive().await;
235+
match request.command.data {
217236
RequestData::FwVersionRequest => {
218237
let fwv = self.get_fw_version().await.map_err(CfuError::ProtocolError)?;
219238
let dev_inf = FwVerComponentInfo::new(fwv, self.get_component_id());
@@ -250,9 +269,7 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
250269
),
251270
component_info: comp_info,
252271
};
253-
self.device
254-
.send_response(InternalResponseData::FwVersionResponse(resp))
255-
.await;
272+
request.respond(Ok(InternalResponseData::FwVersionResponse(resp)));
256273
}
257274
RequestData::PrepareComponentForUpdate => {
258275
self.storage_prepare()
@@ -263,9 +280,7 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
263280
// accept any and all offers regardless of what version it is
264281
if buf.component_info.component_id == self.get_component_id() {
265282
let resp = FwUpdateOfferResponse::new_accept(HostToken::Driver);
266-
self.device
267-
.send_response(InternalResponseData::OfferResponse(resp))
268-
.await;
283+
request.respond(Ok(InternalResponseData::OfferResponse(resp)));
269284
}
270285
}
271286
RequestData::GiveContent(buf) => {
@@ -276,8 +291,14 @@ impl<W: CfuWriter> CfuComponentDefault<W> {
276291
.cfu_write(Some(offset), &buf.data)
277292
.await
278293
.map_err(|e| CfuError::ProtocolError(CfuProtocolError::WriterError(e)))?;
294+
request.respond(Ok(InternalResponseData::ContentResponse(FwUpdateContentResponse::new(
295+
buf.header.sequence_num,
296+
CfuUpdateContentResponseStatus::Success,
297+
))));
298+
}
299+
RequestData::FinalizeUpdate => {
300+
request.respond(Ok(InternalResponseData::ComponentPrepared));
279301
}
280-
RequestData::FinalizeUpdate => {}
281302
}
282303
Ok(())
283304
}

0 commit comments

Comments
 (0)