Skip to content

Commit 5977729

Browse files
authored
cfu-service: Add CFU buffer (#357)
Add code to buffer CFU content. Certain devices may need to do initialization or other operations during a FW update that can cause a timeout on the host. This buffer can be used to provide consistent response times until the device driver is able to catch up.
1 parent cee5a8b commit 5977729

File tree

5 files changed

+592
-2
lines changed

5 files changed

+592
-2
lines changed

cfu-service/src/buffer.rs

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
//! Module that can buffer CFU content
2+
//! This allows prompt responses to content requests even if the component is busy
3+
4+
use core::{cell::RefCell, future::pending};
5+
6+
use embassy_futures::select::{select3, Either3};
7+
use embassy_sync::channel::{DynamicReceiver, DynamicSender};
8+
use embassy_time::{with_timeout, Duration, TimeoutError};
9+
use embedded_cfu_protocol::protocol_definitions::*;
10+
use embedded_services::{
11+
cfu::{
12+
self,
13+
component::{CfuDevice, InternalResponseData, RequestData},
14+
},
15+
error, intrusive_list, trace,
16+
};
17+
18+
/// Internal state for [`Buffer`]
19+
#[derive(Copy, Clone, Default)]
20+
struct State {
21+
/// Component response that arrived outside of the timeout window
22+
pending_response: Option<InternalResponseData>,
23+
/// Whether the component is busy processing a request
24+
component_busy: bool,
25+
}
26+
27+
/// Buffer config
28+
#[derive(Copy, Clone, Default)]
29+
pub struct Config {
30+
/// Maximum amount of time to wait for a request to complete
31+
buffer_timeout: Duration,
32+
}
33+
34+
impl Config {
35+
/// Create a new config with a timeout
36+
pub fn with_timeout(timeout: Duration) -> Self {
37+
Self {
38+
buffer_timeout: timeout,
39+
}
40+
}
41+
}
42+
43+
/// CFU buffer
44+
///
45+
/// This will accept and buffer CFU content if the target component is busy.
46+
/// This means that errors that happen while processing buffered content
47+
/// may not reach the CFU host. Devices should be prepared to handle this
48+
/// and may choose to pad the end of the update with dummy data.
49+
pub struct Buffer<'a> {
50+
/// CFU device
51+
cfu_device: CfuDevice,
52+
/// Internal state
53+
state: RefCell<State>,
54+
/// Component ID to buffer requests for
55+
buffered_id: ComponentId,
56+
/// Sender for the buffer
57+
buffer_sender: DynamicSender<'a, FwUpdateContentCommand>,
58+
/// Receiver for the buffer
59+
buffer_receiver: DynamicReceiver<'a, FwUpdateContentCommand>,
60+
/// Configuration for the buffer
61+
config: Config,
62+
}
63+
64+
pub enum Event {
65+
/// Content request from the host
66+
CfuRequest(RequestData),
67+
/// Available buffered content
68+
BufferedContent(FwUpdateContentCommand),
69+
/// Response from the buffered component
70+
ComponentResponse(InternalResponseData),
71+
}
72+
73+
impl<'a> Buffer<'a> {
74+
/// Create a new Buffer
75+
///
76+
/// The buffer receives requests send to external_id and forwards them to buffered_id.
77+
pub fn new(
78+
external_id: ComponentId,
79+
buffered_id: ComponentId,
80+
buffer_sender: DynamicSender<'a, FwUpdateContentCommand>,
81+
buffer_receiver: DynamicReceiver<'a, FwUpdateContentCommand>,
82+
config: Config,
83+
) -> Self {
84+
Self {
85+
cfu_device: CfuDevice::new(external_id),
86+
state: RefCell::new(Default::default()),
87+
buffered_id,
88+
buffer_sender,
89+
buffer_receiver,
90+
config,
91+
}
92+
}
93+
94+
/// Create a new invalid FW version response
95+
fn create_invalid_fw_version_response(&self) -> InternalResponseData {
96+
let dev_inf = FwVerComponentInfo::new(FwVersion::new(0xffffffff), self.cfu_device.component_id());
97+
let comp_info: [FwVerComponentInfo; MAX_CMPT_COUNT] = [dev_inf; MAX_CMPT_COUNT];
98+
InternalResponseData::FwVersionResponse(GetFwVersionResponse {
99+
header: GetFwVersionResponseHeader::new(1, GetFwVerRespHeaderByte3::NoSpecialFlags),
100+
component_info: comp_info,
101+
})
102+
}
103+
104+
/// Create a content rejection response
105+
fn create_content_rejection(sequence: u16) -> InternalResponseData {
106+
InternalResponseData::ContentResponse(FwUpdateContentResponse::new(
107+
sequence,
108+
CfuUpdateContentResponseStatus::ErrorInvalid,
109+
))
110+
}
111+
112+
/// Process a fw version request
113+
async fn process_get_fw_version(&self) -> InternalResponseData {
114+
if let Ok(InternalResponseData::FwVersionResponse(mut response)) =
115+
cfu::route_request(self.buffered_id, RequestData::FwVersionRequest).await
116+
{
117+
// Update the component ID in the response to match our external ID
118+
response.component_info[0].component_id = self.cfu_device.component_id();
119+
InternalResponseData::FwVersionResponse(response)
120+
} else {
121+
error!("Failed to get FW version for device {}", self.buffered_id);
122+
self.create_invalid_fw_version_response()
123+
}
124+
}
125+
126+
/// Process a give offer request
127+
async fn process_give_offer(&self, offer: &FwUpdateOffer) -> InternalResponseData {
128+
let mut offer = *offer;
129+
offer.component_info.component_id = self.buffered_id;
130+
if let Ok(response @ InternalResponseData::OfferResponse(_)) =
131+
cfu::route_request(self.buffered_id, RequestData::GiveOffer(offer)).await
132+
{
133+
response
134+
} else {
135+
error!("Failed to give offer for device {}", self.buffered_id);
136+
InternalResponseData::OfferResponse(FwUpdateOfferResponse::new_with_failure(
137+
HostToken::Driver,
138+
OfferRejectReason::InvalidComponent,
139+
OfferStatus::Reject,
140+
))
141+
}
142+
}
143+
144+
/// Process update content
145+
async fn process_give_content(&self, state: &mut State, content: &FwUpdateContentCommand) -> InternalResponseData {
146+
// Clear out any pending response if this is a new FW update
147+
if content.header.flags & FW_UPDATE_FLAG_FIRST_BLOCK != 0 {
148+
state.pending_response = None;
149+
}
150+
151+
if state.component_busy {
152+
// Buffer the content if the component is busy
153+
// If the buffer is full, this will block until space is available
154+
trace!("Component is busy, buffering content");
155+
self.buffer_sender.send(*content).await;
156+
} else {
157+
// Buffered component can accept new content, send it
158+
if let Err(e) = cfu::send_device_request(self.buffered_id, RequestData::GiveContent(*content)).await {
159+
error!(
160+
"Failed to send content to buffered component {:?}: {:?}",
161+
self.buffered_id, e
162+
);
163+
return Self::create_content_rejection(content.header.sequence_num);
164+
}
165+
}
166+
167+
// Wait for a response from the buffered component
168+
match with_timeout(self.config.buffer_timeout, cfu::wait_device_response(self.buffered_id)).await {
169+
Err(TimeoutError) => {
170+
// Component didn't respond in time
171+
state.component_busy = true;
172+
173+
// Have most recent response from component, send that instead
174+
if let Some(response) = state.pending_response.take() {
175+
if let InternalResponseData::ContentResponse(mut response) = response {
176+
// Update the sequence number to pretend it's for this content.
177+
trace!("Using pending response: {:?}", response);
178+
response.sequence = content.header.sequence_num;
179+
InternalResponseData::ContentResponse(response)
180+
} else {
181+
// This should never happen and means that the component sent an invalid response
182+
// But send it on anyway
183+
error!("Pending response is not a content response: {:?}", response);
184+
response
185+
}
186+
} else {
187+
// Otherwise just accept the content
188+
trace!("Buffered component timed out, sending accept response");
189+
InternalResponseData::ContentResponse(FwUpdateContentResponse::new(
190+
content.header.sequence_num,
191+
CfuUpdateContentResponseStatus::Success,
192+
))
193+
}
194+
}
195+
Ok(response) => {
196+
trace!("Buffered component responded");
197+
state.component_busy = false;
198+
match response {
199+
Ok(InternalResponseData::ContentResponse(mut response)) => {
200+
response.sequence = content.header.sequence_num;
201+
InternalResponseData::ContentResponse(response)
202+
}
203+
Ok(response) => response,
204+
Err(e) => {
205+
// Couldn't get any response from the buffered component, send a rejection
206+
error!(
207+
"Failed to get response from buffered component {:?}: {:?}",
208+
self.buffered_id, e
209+
);
210+
Self::create_content_rejection(content.header.sequence_num)
211+
}
212+
}
213+
}
214+
}
215+
}
216+
217+
/// Wait for buffered content
218+
///
219+
/// If the component is busy, this will wait indefinitely since the component will not be able to process
220+
async fn wait_buffered_content(&self, is_busy: bool) -> FwUpdateContentCommand {
221+
if is_busy {
222+
let () = pending().await;
223+
unreachable!();
224+
} else {
225+
self.buffer_receiver.receive().await
226+
}
227+
}
228+
229+
/// Wait for an event
230+
pub async fn wait_event(&self) -> Event {
231+
let is_busy = self.state.borrow().component_busy;
232+
match select3(
233+
// Wait for a buffered content request
234+
self.wait_buffered_content(is_busy),
235+
// Wait for a request from the host
236+
self.cfu_device.wait_request(),
237+
// Wait for response from the buffered component
238+
cfu::wait_device_response(self.buffered_id),
239+
)
240+
.await
241+
{
242+
Either3::First(content) => {
243+
trace!("Buffered content received: {:?}", content);
244+
Event::BufferedContent(content)
245+
}
246+
Either3::Second(request) => {
247+
trace!("Request received: {:?}", request);
248+
Event::CfuRequest(request)
249+
}
250+
Either3::Third(response) => {
251+
if let Ok(response) = response {
252+
trace!("Response received: {:?}", response);
253+
Event::ComponentResponse(response)
254+
} else {
255+
error!("Failed to get response from buffered component: {:?}", response);
256+
Event::ComponentResponse(Self::create_content_rejection(0))
257+
}
258+
}
259+
}
260+
}
261+
262+
/// Top-level event processing function
263+
#[allow(clippy::await_holding_refcell_ref)]
264+
pub async fn process(&self, event: Event) -> Option<InternalResponseData> {
265+
let mut state = self.state.borrow_mut();
266+
match event {
267+
Event::CfuRequest(request) => Some(self.process_request(&mut state, request).await),
268+
Event::BufferedContent(content) => {
269+
// Send the buffered content to the component
270+
// Don't need to wait for a response here, the response will be caught later by either [`wait_event`] or [`process_give_content`]
271+
if let Err(e) = cfu::send_device_request(self.buffered_id, RequestData::GiveContent(content)).await {
272+
error!(
273+
"Failed to send content to buffered component {:?}: {:?}",
274+
self.buffered_id, e
275+
);
276+
Some(Self::create_content_rejection(content.header.sequence_num))
277+
} else {
278+
state.component_busy = true;
279+
None
280+
}
281+
}
282+
Event::ComponentResponse(response) => {
283+
// Store the response for the next content request
284+
state.pending_response = Some(response);
285+
state.component_busy = false;
286+
None
287+
}
288+
}
289+
}
290+
291+
/// Process a CFU message and produce a response
292+
async fn process_request(&self, state: &mut State, request: RequestData) -> InternalResponseData {
293+
match request {
294+
RequestData::FwVersionRequest => {
295+
trace!("Got FwVersionRequest");
296+
self.process_get_fw_version().await
297+
}
298+
RequestData::GiveOffer(offer) => {
299+
trace!("Got GiveOffer");
300+
self.process_give_offer(&offer).await
301+
}
302+
RequestData::GiveContent(content) => {
303+
trace!("Got GiveContent");
304+
self.process_give_content(state, &content).await
305+
}
306+
RequestData::FinalizeUpdate => {
307+
trace!("Got FinalizeUpdate");
308+
InternalResponseData::ComponentPrepared
309+
}
310+
RequestData::PrepareComponentForUpdate => {
311+
trace!("Got PrepareComponentForUpdate");
312+
InternalResponseData::ComponentPrepared
313+
}
314+
}
315+
}
316+
317+
/// Send a response to the CFU message
318+
pub async fn send_response(&self, response: InternalResponseData) {
319+
self.cfu_device.send_response(response).await;
320+
}
321+
322+
/// Register the buffer with all relevant services
323+
pub async fn register(&'static self) -> Result<(), intrusive_list::Error> {
324+
cfu::register_device(&self.cfu_device).await
325+
}
326+
}

cfu-service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use embedded_services::cfu::component::*;
66
use embedded_services::cfu::{CfuError, ContextToken};
77
use embedded_services::{comms, error, info};
88

9+
pub mod buffer;
910
pub mod host;
1011
pub mod splitter;
1112

embedded-service/src/cfu/component.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,16 @@ impl CfuDevice {
144144
pub async fn state(&self) -> InternalState {
145145
*self.state.lock().await
146146
}
147+
148+
/// Send a request to this device
149+
pub async fn send_request(&self, request: RequestData) {
150+
self.request.send(request).await;
151+
}
152+
147153
/// Sends a request to this device and returns a response
148154
pub async fn execute_device_request(&self, request: RequestData) -> Result<InternalResponseData, CfuProtocolError> {
149-
self.request.send(request).await;
150-
Ok(self.response.receive().await)
155+
self.send_request(request).await;
156+
Ok(self.wait_response().await)
151157
}
152158

153159
/// Wait for a request
@@ -159,6 +165,11 @@ impl CfuDevice {
159165
pub async fn send_response(&self, response: InternalResponseData) {
160166
self.response.send(response).await;
161167
}
168+
169+
/// Waits for a response
170+
pub async fn wait_response(&self) -> InternalResponseData {
171+
self.response.receive().await
172+
}
162173
}
163174

164175
/// Example for CFU Component

embedded-service/src/cfu/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,25 @@ pub async fn route_request(to: ComponentId, request: RequestData) -> Result<Inte
114114
.map_err(CfuError::ProtocolError)
115115
}
116116

117+
/// Send a request to the specific CFU device, but don't wait for a response
118+
pub async fn send_device_request(to: ComponentId, request: RequestData) -> Result<(), CfuError> {
119+
let device = get_device(to).await;
120+
if device.is_none() {
121+
return Err(CfuError::InvalidComponent);
122+
}
123+
device.unwrap().send_request(request).await;
124+
Ok(())
125+
}
126+
127+
/// Wait for a response from the specific CFU device
128+
pub async fn wait_device_response(to: ComponentId) -> Result<InternalResponseData, CfuError> {
129+
let device = get_device(to).await;
130+
if device.is_none() {
131+
return Err(CfuError::InvalidComponent);
132+
}
133+
Ok(device.unwrap().wait_response().await)
134+
}
135+
117136
/// Singleton struct to give access to the cfu client context
118137
pub struct ContextToken(());
119138

0 commit comments

Comments
 (0)