diff --git a/Cargo.lock b/Cargo.lock index 01f7d335..e9de1e6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,6 +119,17 @@ dependencies = [ "winnow 0.7.13", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -966,6 +977,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_logger" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1123,6 +1147,15 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + [[package]] name = "hid-service" version = "0.1.0" @@ -1136,6 +1169,12 @@ dependencies = [ "log", ] +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + [[package]] name = "ident_case" version = "1.0.1" @@ -1680,13 +1719,17 @@ dependencies = [ name = "power-policy-service" version = "0.1.0" dependencies = [ + "critical-section", "defmt 0.3.100", "embassy-executor", "embassy-futures", "embassy-sync", "embassy-time", "embedded-services", + "env_logger", "log", + "static_cell", + "tokio", ] [[package]] @@ -2055,6 +2098,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thermal-service" version = "0.1.0" @@ -2389,6 +2441,37 @@ version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows" version = "0.61.3" diff --git a/embedded-service/Cargo.toml b/embedded-service/Cargo.toml index 8b5fc7bb..b93a1159 100644 --- a/embedded-service/Cargo.toml +++ b/embedded-service/Cargo.toml @@ -21,6 +21,7 @@ defmt = { workspace = true, optional = true } document-features.workspace = true embassy-sync.workspace = true embassy-time.workspace = true +embassy-futures.workspace = true embedded-batteries-async.workspace = true embedded-cfu-protocol.workspace = true embedded-hal-async.workspace = true @@ -49,7 +50,6 @@ cortex-m.workspace = true [dev-dependencies] critical-section = { workspace = true, features = ["std"] } -embassy-futures.workspace = true embassy-sync = { workspace = true, features = ["std"] } embassy-time = { workspace = true, features = ["std"] } embassy-time-driver = { workspace = true } diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs new file mode 100644 index 00000000..20675ae6 --- /dev/null +++ b/embedded-service/src/event.rs @@ -0,0 +1,43 @@ +//! Common traits for event senders and receivers + +use embassy_sync::channel::{DynamicReceiver, DynamicSender}; + +/// Common event sender trait +pub trait Sender { + /// Attempt to send an event + /// + /// Return none if the event cannot currently be sent + fn try_send(&mut self, event: E) -> Option<()>; + /// Send an event + fn send(&mut self, event: E) -> impl Future; +} + +/// Common event receiver trait +pub trait Receiver { + /// Attempt to receive an event + /// + /// Return none if there are no pending events + fn try_next(&mut self) -> Option; + /// Receiver an event + fn wait_next(&mut self) -> impl Future; +} + +impl Sender for DynamicSender<'_, E> { + fn try_send(&mut self, event: E) -> Option<()> { + DynamicSender::try_send(self, event).ok() + } + + fn send(&mut self, event: E) -> impl Future { + DynamicSender::send(self, event) + } +} + +impl Receiver for DynamicReceiver<'_, E> { + fn try_next(&mut self) -> Option { + self.try_receive().ok() + } + + fn wait_next(&mut self) -> impl Future { + self.receive() + } +} diff --git a/embedded-service/src/lib.rs b/embedded-service/src/lib.rs index 560e1df8..de5882be 100644 --- a/embedded-service/src/lib.rs +++ b/embedded-service/src/lib.rs @@ -17,6 +17,7 @@ pub mod buffer; pub mod cfu; pub mod comms; pub mod ec_type; +pub mod event; pub mod fmt; pub mod hid; pub mod init; diff --git a/embedded-service/src/power/policy/action/device.rs b/embedded-service/src/power/policy/action/device.rs deleted file mode 100644 index 01d0d6c3..00000000 --- a/embedded-service/src/power/policy/action/device.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! Device state machine actions -use super::*; -use crate::power::policy::{ConsumerPowerCapability, Error, ProviderPowerCapability, device, policy}; -use crate::{info, trace}; - -/// Device state machine control -pub struct Device<'a, S: Kind> { - device: &'a device::Device, - _state: core::marker::PhantomData, -} - -/// Enum to contain any state -pub enum AnyState<'a> { - /// Detached - Detached(Device<'a, Detached>), - /// Idle - Idle(Device<'a, Idle>), - /// Connected Consumer - ConnectedConsumer(Device<'a, ConnectedConsumer>), - /// Connected Provider - ConnectedProvider(Device<'a, ConnectedProvider>), -} - -impl AnyState<'_> { - /// Return the kind of the contained state - pub fn kind(&self) -> StateKind { - match self { - AnyState::Detached(_) => StateKind::Detached, - AnyState::Idle(_) => StateKind::Idle, - AnyState::ConnectedConsumer(_) => StateKind::ConnectedConsumer, - AnyState::ConnectedProvider(_) => StateKind::ConnectedProvider, - } - } -} - -impl<'a, S: Kind> Device<'a, S> { - /// Create a new state machine - pub(crate) fn new(device: &'a device::Device) -> Self { - Self { - device, - _state: core::marker::PhantomData, - } - } - - /// Detach the device - pub async fn detach(self) -> Result, Error> { - info!("Received detach from device {}", self.device.id().0); - self.device.set_state(device::State::Detached).await; - self.device.update_consumer_capability(None).await; - self.device.update_requested_provider_capability(None).await; - policy::send_request(self.device.id(), policy::RequestData::NotifyDetached) - .await? - .complete_or_err()?; - Ok(Device::new(self.device)) - } - - /// Disconnect this device - async fn disconnect_internal(&self) -> Result<(), Error> { - info!("Device {} disconnecting", self.device.id().0); - self.device.update_consumer_capability(None).await; - self.device.update_requested_provider_capability(None).await; - self.device.set_state(device::State::Idle).await; - policy::send_request(self.device.id(), policy::RequestData::NotifyDisconnect) - .await? - .complete_or_err() - } - - /// Notify the power policy service of an updated consumer power capability - async fn notify_consumer_power_capability_internal( - &self, - capability: Option, - ) -> Result<(), Error> { - info!( - "Device {} consume capability updated: {:#?}", - self.device.id().0, - capability - ); - self.device.update_consumer_capability(capability).await; - policy::send_request( - self.device.id(), - policy::RequestData::NotifyConsumerCapability(capability), - ) - .await? - .complete_or_err() - } - - /// Request the given power from the power policy service - async fn request_provider_power_capability_internal( - &self, - capability: ProviderPowerCapability, - ) -> Result<(), Error> { - if self.device.provider_capability().await == Some(capability) { - // Already operating at this capability, power policy is already aware, don't need to do anything - trace!("Device {} already requested: {:#?}", self.device.id().0, capability); - return Ok(()); - } - - info!("Request provide from device {}, {:#?}", self.device.id().0, capability); - self.device.update_requested_provider_capability(Some(capability)).await; - policy::send_request( - self.device.id(), - policy::RequestData::RequestProviderCapability(capability), - ) - .await? - .complete_or_err()?; - Ok(()) - } -} - -impl<'a> Device<'a, Detached> { - /// Attach the device - pub async fn attach(self) -> Result, Error> { - info!("Received attach from device {}", self.device.id().0); - self.device.set_state(device::State::Idle).await; - policy::send_request(self.device.id(), policy::RequestData::NotifyAttached) - .await? - .complete_or_err()?; - Ok(Device::new(self.device)) - } -} - -impl Device<'_, Idle> { - /// Notify the power policy service of an updated consumer power capability - pub async fn notify_consumer_power_capability( - &self, - capability: Option, - ) -> Result<(), Error> { - self.notify_consumer_power_capability_internal(capability).await - } - - /// Request the given power from the power policy service - pub async fn request_provider_power_capability(&self, capability: ProviderPowerCapability) -> Result<(), Error> { - self.request_provider_power_capability_internal(capability).await - } -} - -impl<'a> Device<'a, ConnectedConsumer> { - /// Disconnect this device - pub async fn disconnect(self) -> Result, Error> { - self.disconnect_internal().await?; - Ok(Device::new(self.device)) - } - - /// Notify the power policy service of an updated consumer power capability - pub async fn notify_consumer_power_capability( - &self, - capability: Option, - ) -> Result<(), Error> { - self.notify_consumer_power_capability_internal(capability).await - } -} - -impl<'a> Device<'a, ConnectedProvider> { - /// Disconnect this device - pub async fn disconnect(self) -> Result, Error> { - self.disconnect_internal().await?; - Ok(Device::new(self.device)) - } - - /// Request the given power from the power policy service - pub async fn request_provider_power_capability(&self, capability: ProviderPowerCapability) -> Result<(), Error> { - self.request_provider_power_capability_internal(capability).await - } - - /// Notify the power policy service of an updated consumer power capability - pub async fn notify_consumer_power_capability( - &self, - capability: Option, - ) -> Result<(), Error> { - self.notify_consumer_power_capability_internal(capability).await - } -} diff --git a/embedded-service/src/power/policy/action/mod.rs b/embedded-service/src/power/policy/action/mod.rs deleted file mode 100644 index 889dccf7..00000000 --- a/embedded-service/src/power/policy/action/mod.rs +++ /dev/null @@ -1,51 +0,0 @@ -//! Power policy actions -//! This modules contains wrapper structs that use type states to enforce the valid actions for each device state -use super::device::StateKind; - -pub mod device; -pub mod policy; - -trait Sealed {} - -/// Trait to provide the kind of a state type -#[allow(private_bounds)] -pub trait Kind: Sealed { - /// Return the kind of a state type - fn kind() -> StateKind; -} - -/// State type for a detached device -pub struct Detached; -impl Sealed for Detached {} -impl Kind for Detached { - fn kind() -> StateKind { - StateKind::Detached - } -} - -/// State type for an attached device -pub struct Idle; -impl Sealed for Idle {} -impl Kind for Idle { - fn kind() -> StateKind { - StateKind::Idle - } -} - -/// State type for a device that is providing power -pub struct ConnectedProvider; -impl Sealed for ConnectedProvider {} -impl Kind for ConnectedProvider { - fn kind() -> StateKind { - StateKind::ConnectedProvider - } -} - -/// State type for a device that is consuming power -pub struct ConnectedConsumer; -impl Sealed for ConnectedConsumer {} -impl Kind for ConnectedConsumer { - fn kind() -> StateKind { - StateKind::ConnectedConsumer - } -} diff --git a/embedded-service/src/power/policy/action/policy.rs b/embedded-service/src/power/policy/action/policy.rs deleted file mode 100644 index dcd46787..00000000 --- a/embedded-service/src/power/policy/action/policy.rs +++ /dev/null @@ -1,238 +0,0 @@ -//! Policy state machine -use embassy_time::{Duration, TimeoutError, with_timeout}; - -use super::*; -use crate::power::policy::{ConsumerPowerCapability, Error, ProviderPowerCapability, device}; -use crate::{error, info}; - -/// Default timeout for device commands to prevent the policy from getting stuck -const DEFAULT_TIMEOUT: Duration = Duration::from_millis(5000); - -/// Policy state machine control -pub struct Policy<'a, S: Kind> { - device: &'a device::Device, - _state: core::marker::PhantomData, -} - -/// Enum to contain any state -pub enum AnyState<'a> { - /// Detached - Detached(Policy<'a, Detached>), - /// Idle - Idle(Policy<'a, Idle>), - /// Connected Consumer - ConnectedConsumer(Policy<'a, ConnectedConsumer>), - /// Connected Provider - ConnectedProvider(Policy<'a, ConnectedProvider>), -} - -impl AnyState<'_> { - /// Return the kind of the contained state - pub fn kind(&self) -> StateKind { - match self { - AnyState::Detached(_) => StateKind::Detached, - AnyState::Idle(_) => StateKind::Idle, - AnyState::ConnectedConsumer(_) => StateKind::ConnectedConsumer, - AnyState::ConnectedProvider(_) => StateKind::ConnectedProvider, - } - } -} - -impl<'a, S: Kind> Policy<'a, S> { - /// Create a new state machine - pub(crate) fn new(device: &'a device::Device) -> Self { - Self { - device, - _state: core::marker::PhantomData, - } - } - - /// Common disconnect function used by multiple states - async fn disconnect_internal_no_timeout(&self) -> Result<(), Error> { - info!("Device {} got disconnect request", self.device.id().0); - self.device - .execute_device_command(device::CommandData::Disconnect) - .await? - .complete_or_err()?; - self.device.set_state(device::State::Idle).await; - Ok(()) - } - - /// Common disconnect function used by multiple states - async fn disconnect_internal(&self) -> Result<(), Error> { - match with_timeout(DEFAULT_TIMEOUT, self.disconnect_internal_no_timeout()).await { - Ok(r) => r, - Err(TimeoutError) => Err(Error::Timeout), - } - } - - /// Common connect as provider function used by multiple states - async fn connect_as_provider_internal_no_timeout(&self, capability: ProviderPowerCapability) -> Result<(), Error> { - info!("Device {} connecting provider", self.device.id().0); - - self.device - .execute_device_command(device::CommandData::ConnectAsProvider(capability)) - .await? - .complete_or_err()?; - - self.device - .set_state(device::State::ConnectedProvider(capability)) - .await; - - Ok(()) - } - - /// Common connect provider function used by multiple states - async fn connect_provider_internal(&self, capability: ProviderPowerCapability) -> Result<(), Error> { - match with_timeout( - DEFAULT_TIMEOUT, - self.connect_as_provider_internal_no_timeout(capability), - ) - .await - { - Ok(r) => r, - Err(TimeoutError) => Err(Error::Timeout), - } - } -} - -// The policy can do nothing when no device is attached -impl Policy<'_, Detached> {} - -impl<'a> Policy<'a, Idle> { - /// Connect this device as a consumer - pub async fn connect_as_consumer_no_timeout( - self, - capability: ConsumerPowerCapability, - ) -> Result, Error> { - info!("Device {} connecting as consumer", self.device.id().0); - - self.device - .execute_device_command(device::CommandData::ConnectAsConsumer(capability)) - .await? - .complete_or_err()?; - - self.device - .set_state(device::State::ConnectedConsumer(capability)) - .await; - Ok(Policy::new(self.device)) - } - - /// Connect this device as a consumer - pub async fn connect_consumer( - self, - capability: ConsumerPowerCapability, - ) -> Result, Error> { - match with_timeout(DEFAULT_TIMEOUT, self.connect_as_consumer_no_timeout(capability)).await { - Ok(r) => r, - Err(TimeoutError) => Err(Error::Timeout), - } - } - - /// Connect this device as a provider - pub async fn connect_provider_no_timeout( - self, - capability: ProviderPowerCapability, - ) -> Result, Error> { - self.connect_as_provider_internal_no_timeout(capability) - .await - .map(|_| Policy::new(self.device)) - } - - /// Connect this device as a provider - pub async fn connect_provider( - self, - capability: ProviderPowerCapability, - ) -> Result, Error> { - self.connect_provider_internal(capability) - .await - .map(|_| Policy::new(self.device)) - } -} - -impl<'a> Policy<'a, ConnectedConsumer> { - /// Disconnect this device - pub async fn disconnect_no_timeout(self) -> Result, Error> { - self.disconnect_internal_no_timeout() - .await - .map(|_| Policy::new(self.device)) - } - - /// Disconnect this device - pub async fn disconnect(self) -> Result, Error> { - self.disconnect_internal().await.map(|_| Policy::new(self.device)) - } -} - -impl<'a> Policy<'a, ConnectedProvider> { - /// Disconnect this device - pub async fn disconnect_no_timeout(self) -> Result, Error> { - if let Err(e) = self.disconnect_internal_no_timeout().await { - error!("Error disconnecting device {}: {:?}", self.device.id().0, e); - return Err(e); - } - Ok(Policy::new(self.device)) - } - - /// Disconnect this device - pub async fn disconnect(self) -> Result, Error> { - match with_timeout(DEFAULT_TIMEOUT, self.disconnect_no_timeout()).await { - Ok(r) => r, - Err(TimeoutError) => Err(Error::Timeout), - } - } - - /// Connect this device as a consumer - pub async fn connect_as_consumer_no_timeout( - self, - capability: ConsumerPowerCapability, - ) -> Result, Error> { - info!("Device {} connecting as consumer", self.device.id().0); - - self.device - .execute_device_command(device::CommandData::ConnectAsConsumer(capability)) - .await? - .complete_or_err()?; - - self.device - .set_state(device::State::ConnectedConsumer(capability)) - .await; - Ok(Policy::new(self.device)) - } - - /// Connect this device as a consumer - pub async fn connect_consumer( - self, - capability: ConsumerPowerCapability, - ) -> Result, Error> { - match with_timeout(DEFAULT_TIMEOUT, self.connect_as_consumer_no_timeout(capability)).await { - Ok(r) => r, - Err(TimeoutError) => Err(Error::Timeout), - } - } - - /// Connect this device as a provider - pub async fn connect_provider_no_timeout( - &self, - capability: ProviderPowerCapability, - ) -> Result, Error> { - self.connect_as_provider_internal_no_timeout(capability) - .await - .map(|_| Policy::new(self.device)) - } - - /// Connect this device as a provider - pub async fn connect_provider( - &self, - capability: ProviderPowerCapability, - ) -> Result, Error> { - self.connect_provider_internal(capability) - .await - .map(|_| Policy::new(self.device)) - } - - /// Get the provider power capability of this device - pub async fn power_capability(&self) -> ProviderPowerCapability { - self.device.provider_capability().await.unwrap() - } -} diff --git a/embedded-service/src/power/policy/device.rs b/embedded-service/src/power/policy/device.rs index 23a6cc05..545d868c 100644 --- a/embedded-service/src/power/policy/device.rs +++ b/embedded-service/src/power/policy/device.rs @@ -1,11 +1,11 @@ //! Device struct and methods -use core::ops::DerefMut; - use embassy_sync::mutex::Mutex; -use super::{DeviceId, Error, action}; -use crate::ipc::deferred; +use super::{DeviceId, Error}; +use crate::event::Receiver; +use crate::power::policy::policy::RequestData; use crate::power::policy::{ConsumerPowerCapability, ProviderPowerCapability}; +use crate::sync::Lockable; use crate::{GlobalRawMutex, intrusive_list}; /// Most basic device states @@ -48,16 +48,164 @@ impl State { } } -/// Internal device state for power policy +/// Per-device state for power policy implementation +/// +/// This struct implements the state machine outlined in the docs directory. +/// The various state transition functions always succeed in the sense that +/// the desired state is always entered, but some still return a result. +/// This is because a the device that is driving this state machine is the +/// ultimate source of truth and the recovery procedure would ultimately +/// end up catching up to this state anyway. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] -struct InternalState { +pub struct InternalState { /// Current state of the device - pub state: State, + state: State, /// Current consumer capability - pub consumer_capability: Option, + consumer_capability: Option, /// Current requested provider capability - pub requested_provider_capability: Option, + requested_provider_capability: Option, +} + +impl Default for InternalState { + fn default() -> Self { + Self { + state: State::Detached, + consumer_capability: None, + requested_provider_capability: None, + } + } +} + +impl InternalState { + /// Attach the device + pub fn attach(&mut self) -> Result<(), Error> { + let result = if self.state == State::Detached { + Ok(()) + } else { + Err(Error::InvalidState(&[StateKind::Detached], self.state.kind())) + }; + self.state = State::Idle; + result + } + + /// Detach the device + /// + /// Detach is always a valid transition + pub fn detach(&mut self) { + self.state = State::Detached; + self.consumer_capability = None; + self.requested_provider_capability = None; + } + + /// Disconnect this device + pub fn disconnect(&mut self, clear_caps: bool) -> Result<(), Error> { + let result = if matches!(self.state, State::ConnectedConsumer(_) | State::ConnectedProvider(_)) { + Ok(()) + } else { + Err(Error::InvalidState( + &[StateKind::ConnectedConsumer, StateKind::ConnectedProvider], + self.state.kind(), + )) + }; + self.state = State::Idle; + if clear_caps { + self.consumer_capability = None; + self.requested_provider_capability = None; + } + result + } + + /// Update the available consumer capability + pub fn update_consumer_power_capability( + &mut self, + capability: Option, + ) -> Result<(), Error> { + let result = if matches!( + self.state, + State::Idle | State::ConnectedConsumer(_) | State::ConnectedProvider(_) + ) { + Ok(()) + } else { + Err(Error::InvalidState( + &[ + StateKind::Idle, + StateKind::ConnectedConsumer, + StateKind::ConnectedProvider, + ], + self.state.kind(), + )) + }; + self.consumer_capability = capability; + result + } + + /// Updated the requested provider capability + pub fn update_requested_provider_power_capability( + &mut self, + capability: Option, + ) -> Result<(), Error> { + if self.requested_provider_capability == capability { + // Already operating at this capability, power policy is already aware, don't need to do anything + return Ok(()); + } + + let result = if matches!( + self.state, + State::Idle | State::ConnectedConsumer(_) | State::ConnectedProvider(_) + ) { + Ok(()) + } else { + Err(Error::InvalidState( + &[ + StateKind::Idle, + StateKind::ConnectedConsumer, + StateKind::ConnectedProvider, + ], + self.state.kind(), + )) + }; + + self.requested_provider_capability = capability; + result + } + + /// Handle a request to connect as a consumer from the policy + pub fn connect_consumer(&mut self, capability: ConsumerPowerCapability) -> Result<(), Error> { + let result = if self.state == State::Idle { + Ok(()) + } else { + Err(Error::InvalidState(&[StateKind::Idle], self.state.kind())) + }; + self.state = State::ConnectedConsumer(capability); + result + } + + /// Handle a request to connect as a provider from the policy + pub fn connect_provider(&mut self, capability: ProviderPowerCapability) -> Result<(), Error> { + let result = if self.state == State::Idle { + Ok(()) + } else { + Err(Error::InvalidState(&[StateKind::Idle], self.state.kind())) + }; + self.state = State::ConnectedProvider(capability); + result + } + + /// Returns the current state machine state + pub fn state(&self) -> State { + self.state + } + + /// Returns the current consumer capability + pub fn consumer_capability(&self) -> Option { + self.consumer_capability + } + + /// Returns the requested provider capability + pub fn requested_provider_capability(&self) -> Option { + self.requested_provider_capability + } } /// Data for a device request @@ -110,21 +258,39 @@ pub struct Response { pub data: ResponseData, } +/// Trait for devices that can be controlled by a power policy implementation +pub trait DeviceTrait { + /// Disconnect power from this device + fn disconnect(&mut self) -> impl Future>; + /// Connect this device to provide power to an external connection + fn connect_provider(&mut self, capability: ProviderPowerCapability) -> impl Future>; + /// Connect this device to consume power from an external connection + fn connect_consumer(&mut self, capability: ConsumerPowerCapability) -> impl Future>; +} + /// Device struct -pub struct Device { +pub struct Device<'a, D: Lockable, R: Receiver> +where + D::Inner: DeviceTrait, +{ /// Intrusive list node node: intrusive_list::Node, /// Device ID id: DeviceId, /// Current state of the device - state: Mutex, - /// Command channel - command: deferred::Channel, + pub state: Mutex, + /// Reference to hardware + pub device: &'a D, + /// Event receiver + pub receiver: Mutex, } -impl Device { +impl<'a, C: Lockable, R: Receiver> Device<'a, C, R> +where + C::Inner: DeviceTrait, +{ /// Create a new device - pub fn new(id: DeviceId) -> Self { + pub fn new(id: DeviceId, device: &'a C, receiver: R) -> Self { Self { node: intrusive_list::Node::uninit(), id, @@ -133,7 +299,8 @@ impl Device { consumer_capability: None, requested_provider_capability: None, }), - command: deferred::Channel::new(), + device, + receiver: Mutex::new(receiver), } } @@ -142,11 +309,6 @@ impl Device { self.id } - /// Returns the current state of the device - pub async fn state(&self) -> State { - self.state.lock().await.state - } - /// Returns the current consumer capability of the device pub async fn consumer_capability(&self) -> Option { self.state.lock().await.consumer_capability @@ -154,12 +316,12 @@ impl Device { /// Returns true if the device is currently consuming power pub async fn is_consumer(&self) -> bool { - self.state().await.kind() == StateKind::ConnectedConsumer + self.state.lock().await.state.kind() == StateKind::ConnectedConsumer } /// Returns current provider power capability pub async fn provider_capability(&self) -> Option { - match self.state().await { + match self.state.lock().await.state { State::ConnectedProvider(capability) => Some(capability), _ => None, } @@ -172,115 +334,33 @@ impl Device { /// Returns true if the device is currently providing power pub async fn is_provider(&self) -> bool { - self.state().await.kind() == StateKind::ConnectedProvider - } - - /// Execute a command on the device - pub(super) async fn execute_device_command(&self, command: CommandData) -> Result { - self.command.execute(command).await - } - - /// Create a handler for the command channel - /// - /// DROP SAFETY: Direct call to deferred channel primitive - pub async fn receive(&self) -> deferred::Request<'_, GlobalRawMutex, CommandData, InternalResponseData> { - self.command.receive().await - } - - /// Internal function to set device state - pub(super) async fn set_state(&self, new_state: State) { - let mut lock = self.state.lock().await; - let state = lock.deref_mut(); - state.state = new_state; - } - - /// Internal function to set consumer capability - pub(super) async fn update_consumer_capability(&self, capability: Option) { - let mut lock = self.state.lock().await; - let state = lock.deref_mut(); - state.consumer_capability = capability; - } - - /// Internal function to set requested provider capability - pub(super) async fn update_requested_provider_capability(&self, capability: Option) { - let mut lock = self.state.lock().await; - let state = lock.deref_mut(); - state.requested_provider_capability = capability; - } - - /// Try to provide access to the device actions for the given state - pub async fn try_device_action(&self) -> Result, Error> { - let state = self.state().await.kind(); - if S::kind() != state { - return Err(Error::InvalidState(S::kind(), state)); - } - Ok(action::device::Device::new(self)) - } - - /// Provide access to the current device state - pub async fn device_action(&self) -> action::device::AnyState<'_> { - match self.state().await.kind() { - StateKind::Detached => action::device::AnyState::Detached(action::device::Device::new(self)), - StateKind::Idle => action::device::AnyState::Idle(action::device::Device::new(self)), - StateKind::ConnectedProvider => { - action::device::AnyState::ConnectedProvider(action::device::Device::new(self)) - } - StateKind::ConnectedConsumer => { - action::device::AnyState::ConnectedConsumer(action::device::Device::new(self)) - } - } - } - - /// Try to provide access to the policy actions for the given state - /// Implemented here for lifetime reasons - pub(super) async fn try_policy_action(&self) -> Result, Error> { - let state = self.state().await.kind(); - if S::kind() != state { - return Err(Error::InvalidState(S::kind(), state)); - } - Ok(action::policy::Policy::new(self)) - } - - /// Provide access to the current policy actions - /// Implemented here for lifetime reasons - pub(super) async fn policy_action(&self) -> action::policy::AnyState<'_> { - match self.state().await.kind() { - StateKind::Detached => action::policy::AnyState::Detached(action::policy::Policy::new(self)), - StateKind::Idle => action::policy::AnyState::Idle(action::policy::Policy::new(self)), - StateKind::ConnectedProvider => { - action::policy::AnyState::ConnectedProvider(action::policy::Policy::new(self)) - } - StateKind::ConnectedConsumer => { - action::policy::AnyState::ConnectedConsumer(action::policy::Policy::new(self)) - } - } - } - - /// Detach the device, this action is available in all states - pub async fn detach(&self) -> Result, Error> { - match self.device_action().await { - action::device::AnyState::Detached(state) => Ok(state), - action::device::AnyState::Idle(state) => state.detach().await, - action::device::AnyState::ConnectedProvider(state) => state.detach().await, - action::device::AnyState::ConnectedConsumer(state) => state.detach().await, - } + self.state.lock().await.state.kind() == StateKind::ConnectedProvider } } -impl intrusive_list::NodeContainer for Device { +impl + 'static> intrusive_list::NodeContainer for Device<'static, C, R> +where + C::Inner: DeviceTrait, +{ fn get_node(&self) -> &crate::Node { &self.node } } /// Trait for any container that holds a device -pub trait DeviceContainer { +pub trait DeviceContainer> +where + C::Inner: DeviceTrait, +{ /// Get the underlying device struct - fn get_power_policy_device(&self) -> &Device; + fn get_power_policy_device(&self) -> &Device<'_, C, R>; } -impl DeviceContainer for Device { - fn get_power_policy_device(&self) -> &Device { +impl> DeviceContainer for Device<'_, C, R> +where + C::Inner: DeviceTrait, +{ + fn get_power_policy_device(&self) -> &Device<'_, C, R> { self } } diff --git a/embedded-service/src/power/policy/mod.rs b/embedded-service/src/power/policy/mod.rs index 65fc5b90..bf1cb580 100644 --- a/embedded-service/src/power/policy/mod.rs +++ b/embedded-service/src/power/policy/mod.rs @@ -1,5 +1,4 @@ //! Power policy related data structures and messages -pub mod action; pub mod charger; pub mod device; pub mod flags; @@ -20,7 +19,7 @@ pub enum Error { /// The consume request was denied, contains maximum available power CannotConsume(Option), /// The device is not in the correct state (expected, actual) - InvalidState(device::StateKind, device::StateKind), + InvalidState(&'static [device::StateKind], device::StateKind), /// Invalid response InvalidResponse, /// Busy, the device cannot respond to the request at this time diff --git a/embedded-service/src/power/policy/policy.rs b/embedded-service/src/power/policy/policy.rs index 283429ad..b90e7559 100644 --- a/embedded-service/src/power/policy/policy.rs +++ b/embedded-service/src/power/policy/policy.rs @@ -1,35 +1,36 @@ //! Context for any power policy implementations +use core::marker::PhantomData; +use core::pin::pin; use core::sync::atomic::{AtomicBool, Ordering}; -use crate::GlobalRawMutex; use crate::broadcaster::immediate as broadcaster; +use crate::event::Receiver; +use crate::power::policy::device::DeviceTrait; use crate::power::policy::{CommsMessage, ConsumerPowerCapability, ProviderPowerCapability}; -use embassy_sync::channel::Channel; +use crate::sync::Lockable; +use embassy_futures::select::select_slice; use embassy_sync::once_lock::OnceLock; use super::charger::ChargerResponse; use super::device::{self}; -use super::{DeviceId, Error, action, charger}; +use super::{DeviceId, Error, charger}; use crate::power::policy::charger::ChargerResponseData::Ack; use crate::{error, intrusive_list}; -/// Number of slots for policy requests -const POLICY_CHANNEL_SIZE: usize = 1; - /// Data for a power policy request #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum RequestData { /// Notify that a device has attached - NotifyAttached, + Attached, /// Notify that available power for consumption has changed - NotifyConsumerCapability(Option), + UpdatedConsumerCapability(Option), /// Request the given amount of power to provider - RequestProviderCapability(ProviderPowerCapability), + RequestedProviderCapability(Option), /// Notify that a device cannot consume or provide power anymore - NotifyDisconnect, + Disconnected, /// Notify that a device has detached - NotifyDetached, + Detached, } /// Request to the power policy service @@ -69,17 +70,10 @@ pub struct Response { pub data: ResponseData, } -/// Wrapper type to make code cleaner -type InternalResponseData = Result; - /// Power policy context struct Context { /// Registered devices devices: intrusive_list::IntrusiveList, - /// Policy request - policy_request: Channel, - /// Policy response - policy_response: Channel, /// Registered chargers chargers: intrusive_list::IntrusiveList, /// Message broadcaster @@ -91,8 +85,6 @@ impl Context { Self { devices: intrusive_list::IntrusiveList::new(), chargers: intrusive_list::IntrusiveList::new(), - policy_request: Channel::new(), - policy_response: Channel::new(), broadcaster: broadcaster::Immediate::default(), } } @@ -106,9 +98,14 @@ pub fn init() { } /// Register a device with the power policy service -pub async fn register_device(device: &'static impl device::DeviceContainer) -> Result<(), intrusive_list::Error> { +pub async fn register_device + 'static>( + device: &'static impl device::DeviceContainer, +) -> Result<(), intrusive_list::Error> +where + C::Inner: DeviceTrait, +{ let device = device.get_power_policy_device(); - if get_device(device.id()).await.is_some() { + if get_device::(device.id()).await.is_some() { return Err(intrusive_list::Error::NodeAlreadyInList); } @@ -126,9 +123,14 @@ pub async fn register_charger(device: &'static impl charger::ChargerContainer) - } /// Find a device by its ID -async fn get_device(id: DeviceId) -> Option<&'static device::Device> { +async fn get_device + 'static>( + id: DeviceId, +) -> Option<&'static device::Device<'static, C, R>> +where + C::Inner: DeviceTrait, +{ for device in &CONTEXT.get().await.devices { - if let Some(data) = device.data::() { + if let Some(data) = device.data::>() { if data.id() == id { return Some(data); } @@ -155,19 +157,6 @@ async fn get_charger(id: charger::ChargerId) -> Option<&'static charger::Device> None } -/// Convenience function to send a request to the power policy service -pub(super) async fn send_request(from: DeviceId, request: RequestData) -> Result { - let context = CONTEXT.get().await; - context - .policy_request - .send(Request { - id: from, - data: request, - }) - .await; - context.policy_response.receive().await -} - /// Initialize chargers in hardware pub async fn init_chargers() -> ChargerResponse { for charger in &CONTEXT.get().await.chargers { @@ -200,9 +189,17 @@ pub async fn register_message_receiver( } /// Singleton struct to give access to the power policy context -pub struct ContextToken(()); +pub struct ContextToken> +where + D::Inner: DeviceTrait, +{ + _phantom: PhantomData<(D, R)>, +} -impl ContextToken { +impl + 'static> ContextToken +where + D::Inner: DeviceTrait, +{ /// Create a new context token, returning None if this function has been called before pub fn create() -> Option { static INIT: AtomicBool = AtomicBool::new(false); @@ -211,7 +208,7 @@ impl ContextToken { } INIT.store(true, Ordering::SeqCst); - Some(ContextToken(())) + Some(ContextToken { _phantom: PhantomData }) } /// Initialize Policy charger devices @@ -224,18 +221,8 @@ impl ContextToken { Ok(()) } - /// Wait for a power policy request - pub async fn wait_request(&self) -> Request { - CONTEXT.get().await.policy_request.receive().await - } - - /// Send a response to a power policy request - pub async fn send_response(&self, response: Result) { - CONTEXT.get().await.policy_response.send(response).await - } - /// Get a device by its ID - pub async fn get_device(&self, id: DeviceId) -> Result<&'static device::Device, Error> { + pub async fn get_device(&self, id: DeviceId) -> Result<&'static device::Device<'static, D, R>, Error> { get_device(id).await.ok_or(Error::InvalidDevice) } @@ -254,21 +241,29 @@ impl ContextToken { &CONTEXT.get().await.chargers } - /// Try to provide access to the actions available to the policy for the given state and device - pub async fn try_policy_action( - &self, - id: DeviceId, - ) -> Result, Error> { - self.get_device(id).await?.try_policy_action().await - } - - /// Provide access to current policy actions - pub async fn policy_action(&self, id: DeviceId) -> Result, Error> { - Ok(self.get_device(id).await?.policy_action().await) - } - /// Broadcast a power policy message to all subscribers pub async fn broadcast_message(&self, message: CommsMessage) { CONTEXT.get().await.broadcaster.broadcast(message).await; } + + /// Get the next pending device event + pub async fn wait_request(&self) -> Request { + let mut futures = heapless::Vec::<_, 16>::new(); + for device in self.devices().await.iter_only::>() { + // TODO: check this at compile time + let _ = futures.push(async { device.receiver.lock().await.wait_next().await }); + } + + let (event, index) = select_slice(pin!(&mut futures)).await; + let device = self + .devices() + .await + .iter_only::>() + .nth(index) + .unwrap(); + Request { + id: device.id(), + data: event, + } + } } diff --git a/examples/rt633/Cargo.lock b/examples/rt633/Cargo.lock index 09e8d6c8..10b727a9 100644 --- a/examples/rt633/Cargo.lock +++ b/examples/rt633/Cargo.lock @@ -725,6 +725,7 @@ dependencies = [ "critical-section", "defmt 0.3.100", "document-features", + "embassy-futures", "embassy-sync", "embassy-time", "embedded-batteries-async", diff --git a/examples/rt685s-evk/Cargo.lock b/examples/rt685s-evk/Cargo.lock index afe68334..fa4117df 100644 --- a/examples/rt685s-evk/Cargo.lock +++ b/examples/rt685s-evk/Cargo.lock @@ -714,6 +714,7 @@ dependencies = [ "critical-section", "defmt 0.3.100", "document-features", + "embassy-futures", "embassy-sync", "embassy-time", "embedded-batteries-async", diff --git a/examples/std/Cargo.lock b/examples/std/Cargo.lock index c3cc9d59..fc666c3b 100644 --- a/examples/std/Cargo.lock +++ b/examples/std/Cargo.lock @@ -731,6 +731,7 @@ dependencies = [ "cortex-m-rt", "critical-section", "document-features", + "embassy-futures", "embassy-sync", "embassy-time", "embedded-batteries-async", diff --git a/power-policy-service/Cargo.toml b/power-policy-service/Cargo.toml index 9b9032d8..298bcffb 100644 --- a/power-policy-service/Cargo.toml +++ b/power-policy-service/Cargo.toml @@ -19,6 +19,14 @@ embassy-time.workspace = true embedded-services.workspace = true log = { workspace = true, optional = true } +[dev-dependencies] +static_cell.workspace = true +critical-section = { workspace = true, features = ["std"] } +embassy-time = { workspace = true, features = ["std", "generic-queue-8"] } +tokio = { workspace = true, features = ["rt", "macros", "time"] } +env_logger = "0.9.0" +log = { workspace = true } + [features] default = [] defmt = [ diff --git a/power-policy-service/src/consumer.rs b/power-policy-service/src/consumer.rs index d7d8055e..1efe3584 100644 --- a/power-policy-service/src/consumer.rs +++ b/power-policy-service/src/consumer.rs @@ -2,6 +2,7 @@ use core::cmp::Ordering; use embedded_services::debug; use embedded_services::power::policy::charger::Device as ChargerDevice; use embedded_services::power::policy::charger::PolicyEvent; +use embedded_services::power::policy::device::StateKind; use embedded_services::power::policy::policy::check_chargers_ready; use embedded_services::power::policy::policy::init_chargers; @@ -35,14 +36,17 @@ fn cmp_consumer_capability( )) } -impl PowerPolicy { +impl + 'static> PowerPolicy +where + D::Inner: DeviceTrait, +{ /// Iterate over all devices to determine what is best power port provides the highest power async fn find_best_consumer(&self, state: &InternalState) -> Result, Error> { let mut best_consumer = None; let current_consumer_id = state.current_consumer_state.map(|f| f.device_id); for node in self.context.devices().await { - let device = node.data::().ok_or(Error::InvalidDevice)?; + let device = node.data::>().ok_or(Error::InvalidDevice)?; // Update the best available consumer best_consumer = match (best_consumer, device.consumer_capability().await) { @@ -83,7 +87,7 @@ impl PowerPolicy { // Count how many available unconstrained devices we have let mut unconstrained_new = UnconstrainedState::default(); for node in self.context.devices().await { - let device = node.data::().ok_or(Error::InvalidDevice)?; + let device = node.data::>().ok_or(Error::InvalidDevice)?; if let Some(capability) = device.consumer_capability().await { // The device is considered unconstrained if it meets the auto unconstrained power threshold let auto_unconstrained = self @@ -190,18 +194,19 @@ impl PowerPolicy { } state.current_consumer_state = None; - // Disconnect the current consumer if needed - if let Ok(consumer) = self - .context - .try_policy_action::(current_consumer.device_id) - .await - { - info!( - "Device {}, disconnecting current consumer", - current_consumer.device_id.0 - ); + let consumer_device = self.context.get_device(current_consumer.device_id).await?; + if matches!(consumer_device.state.lock().await.state(), State::ConnectedConsumer(_)) { + // Disconnect the current consumer if needed + info!("Device{}: Disconnecting current consumer", current_consumer.device_id.0); // disconnect current consumer and set idle - consumer.disconnect().await?; + consumer_device.device.lock().await.disconnect().await?; + if let Err(e) = consumer_device.state.lock().await.disconnect(false) { + // This should never happen because we check the state above, log an error instead of a panic + error!( + "Device{}: Disconnect transition failed: {:#?}", + current_consumer.device_id.0, e + ); + } } // If no chargers are registered, they won't receive the new power capability. @@ -219,28 +224,38 @@ impl PowerPolicy { } info!("Device {}, connecting new consumer", new_consumer.device_id.0); - if let Ok(idle) = self - .context - .try_policy_action::(new_consumer.device_id) - .await - { - idle.connect_consumer(new_consumer.consumer_power_capability).await?; - self.post_consumer_connected(state, new_consumer).await?; - } else if let Ok(provider) = self - .context - .try_policy_action::(new_consumer.device_id) - .await - { - provider + let device = self.context.get_device(new_consumer.device_id).await?; + let device_state = device.state.lock().await.state(); + + if matches!(device_state, device::State::Idle | device::State::ConnectedConsumer(_)) { + device + .device + .lock() + .await .connect_consumer(new_consumer.consumer_power_capability) .await?; - state.current_consumer_state = Some(new_consumer); + if let Err(e) = device + .state + .lock() + .await + .connect_consumer(new_consumer.consumer_power_capability) + { + // Should never happen because we checked the state above, log an error instead of a panic + error!( + "Device{}: Connect state transition failed: {:#?}", + new_consumer.device_id.0, e + ); + } self.post_consumer_connected(state, new_consumer).await?; + Ok(()) } else { - error!("Error obtaining device in idle state"); + error!( + "Device{}: Not ready to connect consumer, state: {:#?}", + device.id().0, + device_state + ); + Err(Error::InvalidState(&[StateKind::Idle], device_state.kind())) } - - Ok(()) } /// Determines and connects the best external power diff --git a/power-policy-service/src/lib.rs b/power-policy-service/src/lib.rs index d1ae8b5b..768a7e1e 100644 --- a/power-policy-service/src/lib.rs +++ b/power-policy-service/src/lib.rs @@ -1,10 +1,12 @@ #![no_std] use core::ops::DerefMut; use embassy_sync::mutex::Mutex; -use embassy_sync::once_lock::OnceLock; use embedded_services::GlobalRawMutex; -use embedded_services::power::policy::device::Device; -use embedded_services::power::policy::{action, policy, *}; +use embedded_services::event::Receiver; +use embedded_services::power::policy::device::{Device, DeviceTrait, State}; +use embedded_services::power::policy::policy::RequestData; +use embedded_services::power::policy::{policy, *}; +use embedded_services::sync::Lockable; use embedded_services::{comms, error, info}; pub mod config; @@ -25,9 +27,12 @@ struct InternalState { } /// Power policy state -pub struct PowerPolicy { +pub struct PowerPolicy> +where + D::Inner: DeviceTrait, +{ /// Power policy context - context: policy::ContextToken, + context: policy::ContextToken, /// State state: Mutex, /// Comms endpoint @@ -36,7 +41,10 @@ pub struct PowerPolicy { config: config::Config, } -impl PowerPolicy { +impl + 'static> PowerPolicy +where + D::Inner: DeviceTrait, +{ /// Create a new power policy pub fn create(config: config::Config) -> Option { Some(Self { @@ -47,37 +55,75 @@ impl PowerPolicy { }) } - async fn process_notify_attach(&self) -> Result<(), Error> { - self.context.send_response(Ok(policy::ResponseData::Complete)).await; - Ok(()) + async fn process_notify_attach(&self, device: &Device<'_, D, R>) { + if let Err(e) = device.state.lock().await.attach() { + error!("Device{}: Invalid state for attach: {:#?}", device.id().0, e); + } } - async fn process_notify_detach(&self) -> Result<(), Error> { - self.context.send_response(Ok(policy::ResponseData::Complete)).await; - self.update_current_consumer().await?; - Ok(()) + async fn process_notify_detach(&self, device: &Device<'_, D, R>) -> Result<(), Error> { + device.state.lock().await.detach(); + self.update_current_consumer().await } - async fn process_notify_consumer_power_capability(&self) -> Result<(), Error> { - self.context.send_response(Ok(policy::ResponseData::Complete)).await; - self.update_current_consumer().await?; - Ok(()) + async fn process_notify_consumer_power_capability( + &self, + device: &Device<'_, D, R>, + capability: Option, + ) -> Result<(), Error> { + if let Err(e) = device.state.lock().await.update_consumer_power_capability(capability) { + error!( + "Device{}: Invalid state for notify consumer capability, catching up: {:#?}", + device.id().0, + e, + ); + } + + self.update_current_consumer().await } - async fn process_request_provider_power_capabilities(&self, device: DeviceId) -> Result<(), Error> { - self.context.send_response(Ok(policy::ResponseData::Complete)).await; - self.connect_provider(device).await; - Ok(()) + async fn process_request_provider_power_capabilities( + &self, + device: &Device<'_, D, R>, + capability: Option, + ) -> Result<(), Error> { + if let Err(e) = device + .state + .lock() + .await + .update_requested_provider_power_capability(capability) + { + error!( + "Device{}: Invalid state for notify consumer capability, catching up: {:#?}", + device.id().0, + e, + ); + } + + self.connect_provider(device.id()).await } - async fn process_notify_disconnect(&self) -> Result<(), Error> { - self.context.send_response(Ok(policy::ResponseData::Complete)).await; - if let Some(consumer) = self.state.lock().await.current_consumer_state.take() { - info!("Device{}: Connected consumer disconnected", consumer.device_id.0); + async fn process_notify_disconnect(&self, device: &Device<'_, D, R>) -> Result<(), Error> { + if let Err(e) = device.state.lock().await.disconnect(true) { + error!( + "Device{}: Invalid state for notify disconnect, catching up: {:#?}", + device.id().0, + e, + ); + } + + if self + .state + .lock() + .await + .current_consumer_state + .is_some_and(|current| current.device_id == device.id()) + { + info!("Device{}: Connected consumer disconnected", device.id().0); self.disconnect_chargers().await?; self.comms_notify(CommsMessage { - data: CommsData::ConsumerDisconnected(consumer.device_id), + data: CommsData::ConsumerDisconnected(device.id()), }) .await; } @@ -103,33 +149,35 @@ impl PowerPolicy { let device = self.context.get_device(request.id).await?; match request.data { - policy::RequestData::NotifyAttached => { + policy::RequestData::Attached => { info!("Received notify attached from device {}", device.id().0); - self.process_notify_attach().await + self.process_notify_attach(device).await; + Ok(()) } - policy::RequestData::NotifyDetached => { + policy::RequestData::Detached => { info!("Received notify detached from device {}", device.id().0); - self.process_notify_detach().await + self.process_notify_detach(device).await } - policy::RequestData::NotifyConsumerCapability(capability) => { + policy::RequestData::UpdatedConsumerCapability(capability) => { info!( "Device{}: Received notify consumer capability: {:#?}", device.id().0, capability, ); - self.process_notify_consumer_power_capability().await + self.process_notify_consumer_power_capability(device, capability).await } - policy::RequestData::RequestProviderCapability(capability) => { + policy::RequestData::RequestedProviderCapability(capability) => { info!( "Device{}: Received request provider capability: {:#?}", device.id().0, capability, ); - self.process_request_provider_power_capabilities(device.id()).await + self.process_request_provider_power_capabilities(device, capability) + .await } - policy::RequestData::NotifyDisconnect => { + policy::RequestData::Disconnected => { info!("Received notify disconnect from device {}", device.id().0); - self.process_notify_disconnect().await + self.process_notify_disconnect(device).await } } } @@ -141,23 +189,7 @@ impl PowerPolicy { } } -impl comms::MailboxDelegate for PowerPolicy {} - -#[embassy_executor::task] -pub async fn task(config: config::Config) { - info!("Starting power policy task"); - static POLICY: OnceLock = OnceLock::new(); - let policy = - POLICY.get_or_init(|| PowerPolicy::create(config).expect("Power policy singleton already initialized")); - - if comms::register_endpoint(policy, &policy.tp).await.is_err() { - error!("Failed to register power policy endpoint"); - return; - } - - loop { - if let Err(e) = policy.process().await { - error!("Error processing request: {:?}", e); - } - } +impl + 'static> comms::MailboxDelegate for PowerPolicy where + D::Inner: DeviceTrait +{ } diff --git a/power-policy-service/src/provider.rs b/power-policy-service/src/provider.rs index 90b857cf..ea89e176 100644 --- a/power-policy-service/src/provider.rs +++ b/power-policy-service/src/provider.rs @@ -3,7 +3,12 @@ //! the system is in unlimited power state. In this mode up to [provider_unlimited](super::Config::provider_unlimited) //! is provided to each device. Above this threshold, the system is in limited power state. //! In this mode [provider_limited](super::Config::provider_limited) is provided to each device -use embedded_services::{debug, trace}; +use embedded_services::{ + debug, + event::Receiver, + power::policy::{device::StateKind, policy::RequestData}, + trace, +}; use super::*; @@ -25,30 +30,27 @@ pub(super) struct State { state: PowerState, } -impl PowerPolicy { +impl + 'static> PowerPolicy +where + D::Inner: DeviceTrait, +{ /// Attempt to connect the requester as a provider - pub(super) async fn connect_provider(&self, requester_id: DeviceId) { + pub(super) async fn connect_provider(&self, requester_id: DeviceId) -> Result<(), Error> { trace!("Device{}: Attempting to connect as provider", requester_id.0); - let requester = match self.context.get_device(requester_id).await { - Ok(device) => device, - Err(_) => { - error!("Device{}: Invalid device", requester_id.0); - return; - } - }; + let requester = self.context.get_device(requester_id).await?; let requested_power_capability = match requester.requested_provider_capability().await { Some(cap) => cap, // Requester is no longer requesting power _ => { - info!("Device{}: No-longer requesting power", requester.id().0); - return; + error!("Device{}: No-longer requesting power", requester.id().0); + return Err(Error::CannotProvide(None)); } }; let mut state = self.state.lock().await; let mut total_power_mw = 0; // Determine total requested power draw - for device in self.context.devices().await.iter_only::() { + for device in self.context.devices().await.iter_only::>() { let target_provider_cap = if device.id() == requester_id { // Use the requester's requested power capability // this handles both new connections and upgrade requests @@ -87,26 +89,17 @@ impl PowerPolicy { } }; - let connected = if let Ok(action) = self.context.try_policy_action::(requester.id()).await { - let _ = action.connect_provider(target_power).await; - Ok(()) - } else if let Ok(action) = self - .context - .try_policy_action::(requester.id()) - .await - { - let _ = action.connect_provider(target_power).await; - Ok(()) + let device = self.context.get_device(requester_id).await?; + let state = device.state.lock().await.state(); + if matches!(state, device::State::Idle | device::State::ConnectedProvider(_)) { + device.device.lock().await.connect_provider(target_power).await } else { - Err(Error::InvalidState( - device::StateKind::Idle, - requester.state().await.kind(), - )) - }; - - // Don't need to do anything special, the device is responsible for attempting to reconnect - if let Err(e) = connected { - error!("Device{}: Failed to connect as provider, {:#?}", requester.id().0, e); + error!( + "Device{}: Cannot provide, device is in state {:#?}", + device.id().0, + state + ); + Err(Error::InvalidState(&[StateKind::Idle], state.kind())) } } } diff --git a/power-policy-service/tests/common/mock.rs b/power-policy-service/tests/common/mock.rs new file mode 100644 index 00000000..d8722638 --- /dev/null +++ b/power-policy-service/tests/common/mock.rs @@ -0,0 +1,81 @@ +use embassy_sync::signal::Signal; +use embedded_services::power::policy::device::{DeviceTrait, InternalState}; +use embedded_services::power::policy::flags::Consumer; +use embedded_services::power::policy::policy::RequestData; +use embedded_services::power::policy::{ConsumerPowerCapability, Error, PowerCapability, ProviderPowerCapability}; +use embedded_services::{GlobalRawMutex, event, info}; + +#[derive(Debug, Clone, PartialEq, Eq)] +#[allow(dead_code)] +pub enum FnCall { + ConnectConsumer(ConsumerPowerCapability), + ConnectProvider(ProviderPowerCapability), + Disconnect, + Reset, +} + +pub struct Mock<'a, S: event::Sender> { + sender: S, + fn_call: &'a Signal, + // Internal state + pub state: InternalState, +} + +impl<'a, S: event::Sender> Mock<'a, S> { + pub fn new(sender: S, fn_call: &'a Signal) -> Self { + Self { + sender, + fn_call, + state: Default::default(), + } + } + + fn record_fn_call(&mut self, fn_call: FnCall) { + let num_fn_calls = self + .fn_call + .try_take() + .map(|(num_fn_calls, _)| num_fn_calls) + .unwrap_or(1); + self.fn_call.signal((num_fn_calls, fn_call)); + } + + pub async fn simulate_consumer_connection(&mut self, capability: PowerCapability) { + self.state.attach().unwrap(); + + self.sender.send(RequestData::Attached).await; + + let capability = Some(ConsumerPowerCapability { + capability, + flags: Consumer::none(), + }); + self.state.update_consumer_power_capability(capability).unwrap(); + self.sender + .send(RequestData::UpdatedConsumerCapability(capability)) + .await; + } + + pub async fn simulate_detach(&mut self) { + self.state.detach(); + self.sender.send(RequestData::Detached).await; + } +} + +impl<'a, S: event::Sender> DeviceTrait for Mock<'a, S> { + async fn connect_consumer(&mut self, capability: ConsumerPowerCapability) -> Result<(), Error> { + info!("Connect consumer {:#?}", capability); + self.record_fn_call(FnCall::ConnectConsumer(capability)); + Ok(()) + } + + async fn connect_provider(&mut self, capability: ProviderPowerCapability) -> Result<(), Error> { + info!("Connect provider: {:#?}", capability); + self.record_fn_call(FnCall::ConnectProvider(capability)); + Ok(()) + } + + async fn disconnect(&mut self) -> Result<(), Error> { + info!("Disconnect"); + self.record_fn_call(FnCall::Disconnect); + Ok(()) + } +} diff --git a/power-policy-service/tests/common/mod.rs b/power-policy-service/tests/common/mod.rs new file mode 100644 index 00000000..33885699 --- /dev/null +++ b/power-policy-service/tests/common/mod.rs @@ -0,0 +1,132 @@ +use embassy_futures::{ + join::join, + select::{Either, select}, +}; +use embassy_sync::{ + channel::{Channel, DynamicReceiver, DynamicSender}, + mutex::Mutex, + signal::Signal, +}; +use embassy_time::{Duration, with_timeout}; +use embedded_services::{ + GlobalRawMutex, + power::policy::{self, DeviceId, PowerCapability, device, policy::RequestData}, +}; +use power_policy_service::PowerPolicy; + +pub mod mock; + +use mock::Mock; +use static_cell::StaticCell; + +use crate::common::mock::FnCall; + +pub const LOW_POWER: PowerCapability = PowerCapability { + voltage_mv: 5000, + current_ma: 1500, +}; + +#[allow(dead_code)] +pub const HIGH_POWER: PowerCapability = PowerCapability { + voltage_mv: 5000, + current_ma: 3000, +}; + +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15); + +const EVENT_CHANNEL_SIZE: usize = 4; + +async fn power_policy_task( + completion_signal: &'static Signal, + power_policy: &'static PowerPolicy< + Mutex>>, + DynamicReceiver<'static, RequestData>, + >, +) { + loop { + match select(power_policy.process(), completion_signal.wait()).await { + Either::First(result) => result.unwrap(), + Either::Second(_) => { + break; + } + } + } +} + +pub async fn run_test>( + timeout: Duration, + test: impl FnOnce( + &'static Mutex>>, + &'static Signal, + &'static Mutex>>, + &'static Signal, + ) -> F, +) { + env_logger::builder().filter_level(log::LevelFilter::Trace).init(); + embedded_services::init().await; + + static DEVICE0_EVENT_CHANNEL: StaticCell> = + StaticCell::new(); + let device0_event_channel = DEVICE0_EVENT_CHANNEL.init(Channel::new()); + let device0_sender = device0_event_channel.dyn_sender(); + let device0_receiver = device0_event_channel.dyn_receiver(); + + static DEVICE0_SIGNAL: StaticCell> = StaticCell::new(); + let device0_signal = DEVICE0_SIGNAL.init(Signal::new()); + static DEVICE0: StaticCell>>> = StaticCell::new(); + let device0 = DEVICE0.init(Mutex::new(Mock::new(device0_sender, device0_signal))); + + static DEVICE0_REGISTRATION: StaticCell< + device::Device< + 'static, + Mutex>>, + DynamicReceiver<'static, RequestData>, + >, + > = StaticCell::new(); + let device0_registration = DEVICE0_REGISTRATION.init(device::Device::new(DeviceId(0), device0, device0_receiver)); + + policy::register_device(device0_registration).await.unwrap(); + + static DEVICE1_EVENT_CHANNEL: StaticCell> = + StaticCell::new(); + let device1_event_channel = DEVICE1_EVENT_CHANNEL.init(Channel::new()); + let device1_sender = device1_event_channel.dyn_sender(); + let device1_receiver = device1_event_channel.dyn_receiver(); + + static DEVICE1_SIGNAL: StaticCell> = StaticCell::new(); + let device1_signal = DEVICE1_SIGNAL.init(Signal::new()); + static DEVICE1: StaticCell>>> = StaticCell::new(); + let device1 = DEVICE1.init(Mutex::new(Mock::new(device1_sender, device1_signal))); + + static DEVICE1_REGISTRATION: StaticCell< + device::Device< + 'static, + Mutex>>, + DynamicReceiver<'static, RequestData>, + >, + > = StaticCell::new(); + let device1_registration = DEVICE1_REGISTRATION.init(device::Device::new(DeviceId(1), device1, device1_receiver)); + + policy::register_device(device1_registration).await.unwrap(); + + static POWER_POLICY: StaticCell< + PowerPolicy< + Mutex>>, + DynamicReceiver<'static, RequestData>, + >, + > = StaticCell::new(); + let power_policy = POWER_POLICY.init(power_policy_service::PowerPolicy::create(Default::default()).unwrap()); + + static COMPLETION_SIGNAL: StaticCell> = StaticCell::new(); + let completion_signal = COMPLETION_SIGNAL.init(Signal::new()); + + with_timeout( + timeout, + join(power_policy_task(completion_signal, power_policy), async { + test(device0, device0_signal, device1, device1_signal).await; + completion_signal.signal(()); + }), + ) + .await + .unwrap(); +} diff --git a/power-policy-service/tests/consumer.rs b/power-policy-service/tests/consumer.rs new file mode 100644 index 00000000..c201e527 --- /dev/null +++ b/power-policy-service/tests/consumer.rs @@ -0,0 +1,134 @@ +use embassy_sync::{channel::DynamicSender, mutex::Mutex, signal::Signal}; +use embassy_time::{Duration, TimeoutError, with_timeout}; +use embedded_services::{ + GlobalRawMutex, + power::policy::{ConsumerPowerCapability, flags::Consumer, policy::RequestData}, +}; + +mod common; + +use common::LOW_POWER; + +use crate::common::{ + DEFAULT_TIMEOUT, HIGH_POWER, + mock::{FnCall, Mock}, + run_test, +}; + +const PER_CALL_TIMEOUT: Duration = Duration::from_millis(1000); + +/// Test the basic consumer flow with a single device. +async fn test_single( + device0: &'static Mutex>>, + device0_signal: &'static Signal, +) { + // Test initial connection + { + device0.lock().await.simulate_consumer_connection(LOW_POWER).await; + + assert_eq!( + with_timeout(PER_CALL_TIMEOUT, device0_signal.wait()).await.unwrap(), + ( + 1, + FnCall::ConnectConsumer(ConsumerPowerCapability { + capability: LOW_POWER, + flags: Consumer::none(), + }) + ) + ); + device0_signal.reset(); + } + // Test detach + { + device0.lock().await.simulate_detach().await; + + // Power policy shouldn't call any functions on detach so we'll timeout + assert_eq!( + with_timeout(PER_CALL_TIMEOUT, device0_signal.wait()).await, + Err(TimeoutError) + ); + device0_signal.reset(); + } +} + +/// Test swapping to a higher powered device. +async fn test_swap_higher( + device0: &'static Mutex>>, + device0_signal: &'static Signal, + device1: &'static Mutex>>, + device1_signal: &'static Signal, +) { + // Device0 connection at low power + { + device0.lock().await.simulate_consumer_connection(LOW_POWER).await; + + assert_eq!( + with_timeout(PER_CALL_TIMEOUT, device0_signal.wait()).await.unwrap(), + ( + 1, + FnCall::ConnectConsumer(ConsumerPowerCapability { + capability: LOW_POWER, + flags: Consumer::none(), + }) + ) + ); + device0_signal.reset(); + } + // Device1 connection at high power + { + device1.lock().await.simulate_consumer_connection(HIGH_POWER).await; + + assert_eq!( + with_timeout(PER_CALL_TIMEOUT, device0_signal.wait()).await.unwrap(), + (1, FnCall::Disconnect) + ); + device0_signal.reset(); + + assert_eq!( + with_timeout(PER_CALL_TIMEOUT, device1_signal.wait()).await.unwrap(), + ( + 1, + FnCall::ConnectConsumer(ConsumerPowerCapability { + capability: HIGH_POWER, + flags: Consumer::none(), + }) + ) + ); + device1_signal.reset(); + } + // Test detach device1, should reconnect device0 + { + device1.lock().await.simulate_detach().await; + + // Power policy shouldn't call any functions on detach so we'll timeout + assert_eq!( + with_timeout(PER_CALL_TIMEOUT, device1_signal.wait()).await, + Err(TimeoutError) + ); + + assert_eq!( + with_timeout(PER_CALL_TIMEOUT, device0_signal.wait()).await.unwrap(), + ( + 1, + FnCall::ConnectConsumer(ConsumerPowerCapability { + capability: LOW_POWER, + flags: Consumer::none(), + }) + ) + ); + device0_signal.reset(); + } +} + +/// Run all tests, this is temporary to deal with 'static lifetimes until the intrusive list refactor is done. +#[tokio::test] +async fn run_all_tests() { + run_test( + DEFAULT_TIMEOUT, + |device0, device0_signal, device1, device1_signal| async move { + test_single(device0, device0_signal).await; + test_swap_higher(device0, device0_signal, device1, device1_signal).await; + }, + ) + .await; +} diff --git a/type-c-service/src/wrapper/backing.rs b/type-c-service/src/wrapper/backing.rs index 53cd2bdb..0dd44b96 100644 --- a/type-c-service/src/wrapper/backing.rs +++ b/type-c-service/src/wrapper/backing.rs @@ -21,7 +21,7 @@ //! use embedded_services::type_c::ControllerId; //! use embedded_services::power; //! use embedded_usb_pd::GlobalPortId; -//! use type_c_service::wrapper::backing::{Storage, ReferencedStorage}; +//! use type_c_service::wrapper::backing::{Storage, IntermediateStorage, ReferencedStorage}; //! //! //! const NUM_PORTS: usize = 2; @@ -33,8 +33,10 @@ //! 0x0, //! [(GlobalPortId(0), power::policy::DeviceId(0)), (GlobalPortId(1), power::policy::DeviceId(1))], //! )); +//! static INTERMEDIATE: StaticCell> = StaticCell::new(); +//! let intermediate = INTERMEDIATE.init(storage.create_intermediate()); //! static REFERENCED: StaticCell> = StaticCell::new(); -//! let referenced = REFERENCED.init(storage.create_referenced()); +//! let referenced = REFERENCED.init(intermediate.create_referenced()); //! let _backing = referenced.create_backing().unwrap(); //! } //! ``` @@ -45,12 +47,17 @@ use core::{ use embassy_sync::{ blocking_mutex::raw::RawMutex, + mutex::Mutex, pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}, }; use embassy_time::Instant; use embedded_cfu_protocol::protocol_definitions::ComponentId; use embedded_services::{ - power, + event, + power::{ + self, + policy::{DeviceId, policy}, + }, type_c::{ ControllerId, controller::PortStatus, @@ -59,7 +66,13 @@ use embedded_services::{ }; use embedded_usb_pd::{GlobalPortId, ado::Ado}; -use crate::{PortEventStreamer, wrapper::cfu}; +use crate::{ + PortEventStreamer, + wrapper::{ + cfu, + proxy::{PowerProxyChannel, PowerProxyDevice, PowerProxyReceiver}, + }, +}; /// Per-port state pub struct PortState<'a> { @@ -96,13 +109,14 @@ impl Default for ControllerState { } /// Internal state containing all per-port and per-controller state -struct InternalState<'a, const N: usize> { +struct InternalState<'a, const N: usize, S: event::Sender> { controller_state: ControllerState, port_states: [PortState<'a>; N], + port_power: [PortPower; N], } -impl<'a, const N: usize> InternalState<'a, N> { - fn new(storage: &'a Storage) -> Self { +impl<'a, const N: usize, S: event::Sender> InternalState<'a, N, S> { + fn new(storage: &'a Storage, power_events: [S; N]) -> Self { Self { controller_state: ControllerState::default(), port_states: from_fn(|i| PortState { @@ -115,11 +129,15 @@ impl<'a, const N: usize> InternalState<'a, N> { storage.pd_alerts[i].dyn_subscriber().unwrap(), ), }), + port_power: power_events.map(|sender| PortPower { + sender, + state: Default::default(), + }), } } } -impl<'a, const N: usize> DynPortState<'a> for InternalState<'a, N> { +impl<'a, const N: usize, S: event::Sender> DynPortState<'a, S> for InternalState<'a, N, S> { fn num_ports(&self) -> usize { self.port_states.len() } @@ -139,10 +157,18 @@ impl<'a, const N: usize> DynPortState<'a> for InternalState<'a, N> { fn controller_state_mut(&mut self) -> &mut ControllerState { &mut self.controller_state } + + fn port_power(&self) -> &[PortPower] { + &self.port_power + } + + fn port_power_mut(&mut self) -> &mut [PortPower] { + &mut self.port_power + } } /// Trait to erase the generic port count argument -pub trait DynPortState<'a> { +pub trait DynPortState<'a, S: event::Sender> { fn num_ports(&self) -> usize; fn port_states(&self) -> &[PortState<'a>]; @@ -150,16 +176,19 @@ pub trait DynPortState<'a> { fn controller_state(&self) -> &ControllerState; fn controller_state_mut(&mut self) -> &mut ControllerState; + + fn port_power(&self) -> &[PortPower]; + fn port_power_mut(&mut self) -> &mut [PortPower]; } /// Service registration objects -pub struct Registration<'a> { +pub struct Registration<'a, M: RawMutex, R: event::Receiver> { pub pd_controller: &'a embedded_services::type_c::controller::Device<'a>, pub cfu_device: &'a embedded_services::cfu::component::CfuDevice, - pub power_devices: &'a [embedded_services::power::policy::device::Device], + pub power_devices: &'a [embedded_services::power::policy::device::Device<'a, Mutex>, R>], } -impl<'a> Registration<'a> { +impl<'a, M: RawMutex, R: event::Receiver> Registration<'a, M, R> { pub fn num_ports(&self) -> usize { self.power_devices.len() } @@ -168,36 +197,83 @@ impl<'a> Registration<'a> { /// PD alerts should be fairly uncommon, four seems like a reasonable number to start with. const MAX_BUFFERED_PD_ALERTS: usize = 4; +pub struct PortPower> { + pub sender: S, + pub state: power::policy::device::InternalState, +} + /// Base storage pub struct Storage { // Registration-related controller_id: ControllerId, pd_ports: [GlobalPortId; N], cfu_device: embedded_services::cfu::component::CfuDevice, - power_devices: [embedded_services::power::policy::device::Device; N], + power_proxy_channels: [PowerProxyChannel; N], // State-related pd_alerts: [PubSubChannel; N], } impl Storage { - pub fn new( - controller_id: ControllerId, - cfu_id: ComponentId, - ports: [(GlobalPortId, power::policy::DeviceId); N], - ) -> Self { + pub fn new(controller_id: ControllerId, cfu_id: ComponentId, pd_ports: [GlobalPortId; N]) -> Self { Self { controller_id, - pd_ports: ports.map(|(port, _)| port), + pd_ports, cfu_device: embedded_services::cfu::component::CfuDevice::new(cfu_id), - power_devices: ports.map(|(_, device)| embedded_services::power::policy::device::Device::new(device)), + power_proxy_channels: from_fn(|_| PowerProxyChannel::new()), pd_alerts: [const { PubSubChannel::new() }; N], } } - /// Create referenced storage from this storage - pub fn create_referenced(&self) -> ReferencedStorage<'_, N, M> { - ReferencedStorage::from_storage(self) + /// Create intermediate storage from this storage + pub fn create_intermediate(&self) -> IntermediateStorage<'_, N, M> { + IntermediateStorage::from_storage(self) + } +} + +/// Intermediate storage that holds power proxy devices +pub struct IntermediateStorage<'a, const N: usize, M: RawMutex> { + storage: &'a Storage, + power_proxy_devices: [Mutex>; N], + power_proxy_receivers: [Mutex>; N], +} + +impl<'a, const N: usize, M: RawMutex> IntermediateStorage<'a, N, M> { + fn from_storage(storage: &'a Storage) -> Self { + let mut power_proxy_devices = heapless::Vec::<_, N>::new(); + let mut power_proxy_receivers = heapless::Vec::<_, N>::new(); + + for power_proxy_channel in storage.power_proxy_channels.iter() { + // Safe because everything has a length of N + power_proxy_devices + .push(Mutex::new(power_proxy_channel.get_device())) + .unwrap_or_else(|_| panic!("Failed to insert power proxy device")); + power_proxy_receivers + .push(Mutex::new(power_proxy_channel.get_receiver())) + .unwrap_or_else(|_| panic!("Failed to insert power proxy receiver")); + } + + Self { + storage, + // Safe because both have N elements + power_proxy_devices: power_proxy_devices + .into_array() + .unwrap_or_else(|_| panic!("Failed to create power devices")), + power_proxy_receivers: power_proxy_receivers + .into_array() + .unwrap_or_else(|_| panic!("Failed to create power receivers")), + } + } + + /// Create referenced storage from this intermediate storage + pub fn create_referenced<'b, S: event::Sender, R: event::Receiver>( + &'b self, + policy_args: [(DeviceId, S, R); N], + ) -> ReferencedStorage<'b, N, M, S, R> + where + 'b: 'a, + { + ReferencedStorage::from_intermediate(self, policy_args) } } @@ -205,43 +281,79 @@ impl Storage { /// /// To simplify usage, we use interior mutability through a ref cell to avoid having to declare the state /// completely separately. -pub struct ReferencedStorage<'a, const N: usize, M: RawMutex> { - storage: &'a Storage, - state: RefCell>, +pub struct ReferencedStorage< + 'a, + const N: usize, + M: RawMutex, + S: event::Sender, + R: event::Receiver, +> { + intermediate: &'a IntermediateStorage<'a, N, M>, + state: RefCell>, pd_controller: embedded_services::type_c::controller::Device<'a>, + power_devices: [embedded_services::power::policy::device::Device<'a, Mutex>, R>; N], } -impl<'a, const N: usize, M: RawMutex> ReferencedStorage<'a, N, M> { - /// Create a new referenced storage from the given storage and controller ID - fn from_storage(storage: &'a Storage) -> Self { +impl<'a, const N: usize, M: RawMutex, S: event::Sender, R: event::Receiver> + ReferencedStorage<'a, N, M, S, R> +{ + /// Create a new referenced storage from the given intermediate storage + fn from_intermediate(intermediate: &'a IntermediateStorage<'a, N, M>, policy_args: [(DeviceId, S, R); N]) -> Self { + let mut power_senders = heapless::Vec::<_, N>::new(); + let mut power_devices = heapless::Vec::<_, N>::new(); + + for (i, (device_id, policy_sender, policy_receiver)) in policy_args.into_iter().enumerate() { + power_senders + .push(policy_sender) + .unwrap_or_else(|_| panic!("Failed to insert policy sender")); + power_devices + .push(embedded_services::power::policy::device::Device::new( + device_id, + &intermediate.power_proxy_devices[i], + policy_receiver, + )) + .unwrap_or_else(|_| panic!("Failed to insert power device")); + } + Self { - storage, - state: RefCell::new(InternalState::new(storage)), + intermediate, + state: RefCell::new(InternalState::new( + intermediate.storage, + // Safe because both have N elements + power_senders + .into_array() + .unwrap_or_else(|_| panic!("Failed to create power events")), + )), pd_controller: embedded_services::type_c::controller::Device::new( - storage.controller_id, - storage.pd_ports.as_slice(), + intermediate.storage.controller_id, + intermediate.storage.pd_ports.as_slice(), ), + power_devices: power_devices + .into_array() + .unwrap_or_else(|_| panic!("Failed to create power devices")), } } /// Creates the backing, returns `None` if a backing has already been created - pub fn create_backing<'b>(&'b self) -> Option> + pub fn create_backing<'b>(&'b self) -> Option> where 'b: 'a, { - self.state.try_borrow_mut().ok().map(|state| Backing { + self.state.try_borrow_mut().ok().map(|state| Backing:: { registration: Registration { pd_controller: &self.pd_controller, - cfu_device: &self.storage.cfu_device, - power_devices: &self.storage.power_devices, + cfu_device: &self.intermediate.storage.cfu_device, + power_devices: &self.power_devices, }, state, + power_receivers: &self.intermediate.power_proxy_receivers, }) } } /// Wrapper around registration and type-erased state -pub struct Backing<'a> { - pub(crate) registration: Registration<'a>, - pub(crate) state: RefMut<'a, dyn DynPortState<'a>>, +pub struct Backing<'a, M: RawMutex, S: event::Sender, R: event::Receiver> { + pub(crate) registration: Registration<'a, M, R>, + pub(crate) state: RefMut<'a, dyn DynPortState<'a, S>>, + pub(crate) power_receivers: &'a [Mutex>], } diff --git a/type-c-service/src/wrapper/cfu.rs b/type-c-service/src/wrapper/cfu.rs index 1f23cfc7..66aae457 100644 --- a/type-c-service/src/wrapper/cfu.rs +++ b/type-c-service/src/wrapper/cfu.rs @@ -4,6 +4,7 @@ use embassy_futures::select::{Either, select}; use embedded_cfu_protocol::protocol_definitions::*; use embedded_services::cfu::component::{InternalResponseData, RequestData}; use embedded_services::power; +use embedded_services::power::policy::policy; use embedded_services::type_c::controller::Controller; use embedded_services::{debug, error}; @@ -29,7 +30,14 @@ impl FwUpdateState { } } -impl<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> ControllerWrapper<'device, M, C, V> +impl< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> ControllerWrapper<'device, M, C, S, R, V> where ::Inner: Controller, { @@ -98,7 +106,7 @@ where async fn process_abort_update( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, ) -> InternalResponseData { // abort the update process match controller.abort_fw_update().await { @@ -123,7 +131,7 @@ where async fn process_give_content( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, content: &FwUpdateContentCommand, ) -> InternalResponseData { let data = &content.data[0..content.header.data_length as usize]; @@ -133,47 +141,14 @@ where // Detach from the power policy so it doesn't attempt to do anything while we are updating let controller_id = self.registration.pd_controller.id(); - let mut detached_all = true; - for power in self.registration.power_devices { + for power in state.port_power_mut() { info!("Controller{}: checking power device", controller_id.0); - if power.state().await != power::policy::device::State::Detached { + if power.state.state() != power::policy::device::State::Detached { info!("Controller{}: Detaching power device", controller_id.0); - if let Err(e) = power.detach().await { - error!("Controller{}: Failed to detach power device: {:?}", controller_id.0, e); - - // Sync to bring the controller to a known state with all services - match self.sync_state_internal(controller, state).await { - Ok(_) => debug!( - "Controller{}: Synced state after detaching power device", - controller_id.0 - ), - Err(Error::Pd(e)) => error!( - "Controller{}: Failed to sync state after detaching power device: {:?}", - controller_id.0, e - ), - Err(Error::Bus(_)) => error!( - "Controller{}: Failed to sync state after detaching power device, bus error", - controller_id.0 - ), - } - - detached_all = false; - break; - } + power.sender.send(policy::RequestData::Detached).await; } } - if !detached_all { - error!( - "Controller{}: Failed to detach all power devices, rejecting offer", - controller_id.0 - ); - return InternalResponseData::ContentResponse(FwUpdateContentResponse::new( - content.header.sequence_num, - CfuUpdateContentResponseStatus::ErrorPrepare, - )); - } - // Need to start the update self.fw_update_ticker.lock().await.reset(); match controller.start_fw_update().await { @@ -250,7 +225,7 @@ where } /// Process a CFU tick - pub async fn process_cfu_tick(&self, controller: &mut C::Inner, state: &mut dyn DynPortState<'_>) { + pub async fn process_cfu_tick(&self, controller: &mut C::Inner, state: &mut dyn DynPortState<'_, S>) { match state.controller_state_mut().fw_update_state { FwUpdateState::Idle => { // No FW update in progress, nothing to do @@ -293,7 +268,7 @@ where pub async fn process_cfu_command( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, command: &RequestData, ) -> InternalResponseData { if state.controller_state().fw_update_state == FwUpdateState::Recovery { diff --git a/type-c-service/src/wrapper/dp.rs b/type-c-service/src/wrapper/dp.rs index 8bae8560..d02fd561 100644 --- a/type-c-service/src/wrapper/dp.rs +++ b/type-c-service/src/wrapper/dp.rs @@ -1,10 +1,17 @@ use super::{ControllerWrapper, FwOfferValidator}; use crate::wrapper::message::OutputDpStatusChanged; use embassy_sync::blocking_mutex::raw::RawMutex; -use embedded_services::{sync::Lockable, trace, type_c::controller::Controller}; +use embedded_services::{event, power::policy::policy, sync::Lockable, trace, type_c::controller::Controller}; use embedded_usb_pd::{Error, LocalPortId}; -impl<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> ControllerWrapper<'device, M, C, V> +impl< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> ControllerWrapper<'device, M, C, S, R, V> where ::Inner: Controller, { diff --git a/type-c-service/src/wrapper/message.rs b/type-c-service/src/wrapper/message.rs index f8416b3a..e5650f21 100644 --- a/type-c-service/src/wrapper/message.rs +++ b/type-c-service/src/wrapper/message.rs @@ -31,12 +31,11 @@ pub struct EventPortNotification { } /// Power policy command event data -pub struct EventPowerPolicyCommand<'a> { +pub struct EventPowerPolicyCommand { /// Port ID pub port: LocalPortId, /// Power policy request - pub request: - deferred::Request<'a, GlobalRawMutex, policy::device::CommandData, policy::device::InternalResponseData>, + pub request: policy::device::CommandData, } /// CFU events @@ -58,7 +57,7 @@ pub enum Event<'a> { /// Port notification PortNotification(EventPortNotification), /// Power policy command received - PowerPolicyCommand(EventPowerPolicyCommand<'a>), + PowerPolicyCommand(EventPowerPolicyCommand), /// Command from TCPM ControllerCommand(deferred::Request<'a, GlobalRawMutex, controller::Command, controller::Response<'static>>), /// Cfu event @@ -88,12 +87,9 @@ pub struct OutputPdAlert { } /// Power policy command output data -pub struct OutputPowerPolicyCommand<'a> { +pub struct OutputPowerPolicyCommand { /// Port ID pub port: LocalPortId, - /// Power policy request - pub request: - deferred::Request<'a, GlobalRawMutex, policy::device::CommandData, policy::device::InternalResponseData>, /// Response pub response: policy::device::InternalResponseData, } @@ -158,7 +154,7 @@ pub enum Output<'a> { /// Vendor-defined messaging. Vdm(vdm::Output), /// Power policy command received - PowerPolicyCommand(OutputPowerPolicyCommand<'a>), + PowerPolicyCommand(OutputPowerPolicyCommand), /// TPCM command response ControllerCommand(OutputControllerCommand<'a>), /// CFU recovery tick diff --git a/type-c-service/src/wrapper/mod.rs b/type-c-service/src/wrapper/mod.rs index 3b010ee2..5eb8ddd9 100644 --- a/type-c-service/src/wrapper/mod.rs +++ b/type-c-service/src/wrapper/mod.rs @@ -27,9 +27,9 @@ use embassy_sync::mutex::Mutex; use embassy_sync::signal::Signal; use embassy_time::Instant; use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion}; -use embedded_services::GlobalRawMutex; +use embedded_services::event; use embedded_services::power::policy::device::StateKind; -use embedded_services::power::policy::{self, action}; +use embedded_services::power::policy::policy; use embedded_services::sync::Lockable; use embedded_services::type_c::controller::{self, Controller, PortStatus}; use embedded_services::type_c::event::{PortEvent, PortNotificationSingle, PortPending, PortStatusChanged}; @@ -37,8 +37,9 @@ use embedded_services::{debug, error, info, trace, warn}; use embedded_usb_pd::ado::Ado; use embedded_usb_pd::{Error, LocalPortId, PdError}; -use crate::wrapper::backing::DynPortState; +use crate::wrapper::backing::{DynPortState, PortPower}; use crate::wrapper::message::*; +use crate::wrapper::proxy::PowerProxyReceiver; use crate::{PortEventStreamer, PortEventVariant}; pub mod backing; @@ -47,6 +48,7 @@ mod dp; pub mod message; mod pd; mod power; +pub mod proxy; mod vdm; /// Base interval for checking for FW update timeouts and recovery attempts @@ -67,8 +69,14 @@ pub trait FwOfferValidator { pub const MAX_SUPPORTED_PORTS: usize = 2; /// Common functionality implemented on top of [`embedded_services::type_c::controller::Controller`] -pub struct ControllerWrapper<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> -where +pub struct ControllerWrapper< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> where ::Inner: Controller, { controller: &'device C, @@ -77,21 +85,30 @@ where /// FW update ticker used to check for timeouts and recovery attempts fw_update_ticker: Mutex, /// Registration information for services - registration: backing::Registration<'device>, + registration: backing::Registration<'device, M, R>, /// State - state: Mutex>>, + state: Mutex>>, /// SW port status event signal sw_status_event: Signal, + /// Power proxy receivers + power_proxy_receivers: &'device [Mutex>], } -impl<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> ControllerWrapper<'device, M, C, V> +impl< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> ControllerWrapper<'device, M, C, S, R, V> where ::Inner: Controller, { /// Create a new controller wrapper, returns `None` if the backing storage is already in use pub fn try_new( controller: &'device C, - storage: &'device backing::ReferencedStorage<'device, N, M>, + storage: &'device backing::ReferencedStorage<'device, N, M, S, R>, fw_version_validator: V, ) -> Option { const { @@ -108,14 +125,10 @@ where registration: backing.registration, state: Mutex::new(backing.state), sw_status_event: Signal::new(), + power_proxy_receivers: backing.power_receivers, }) } - /// Get the power policy devices for this controller. - pub fn power_policy_devices(&self) -> &[policy::device::Device] { - self.registration.power_devices - } - /// Get the cached port status, returns None if the port is invalid pub async fn get_cached_port_status(&self, local_port: LocalPortId) -> Option { self.state @@ -138,7 +151,7 @@ where async fn sync_state_internal( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, ) -> Result<(), Error<::BusError>> { // Sync the controller state with the PD controller for (i, port_state) in state.port_states_mut().iter_mut().enumerate() { @@ -175,7 +188,7 @@ where async fn process_plug_event( &self, _controller: &mut C::Inner, - power: &policy::device::Device, + power: &mut PortPower, port: LocalPortId, status: &PortStatus, ) -> Result<(), Error<::BusError>> { @@ -187,32 +200,12 @@ where info!("Plug event"); if status.is_connected() { info!("Plug inserted"); - - // Recover if we're not in the correct state - if power.state().await.kind() != StateKind::Detached { - warn!("Power device not in detached state, recovering"); - if let Err(e) = power.detach().await { - error!("Error detaching power device: {:?}", e); - return PdError::Failed.into(); - } - } - - if let Ok(state) = power.try_device_action::().await { - if let Err(e) = state.attach().await { - error!("Error attaching power device: {:?}", e); - return PdError::Failed.into(); - } - } else { - // This should never happen - error!("Power device not in detached state"); - return PdError::InvalidMode.into(); + if let Err(e) = power.state.attach() { + warn!("Power device not in detached state, recovering: {:#?}", e); } } else { info!("Plug removed"); - if let Err(e) = power.detach().await { - error!("Error detaching power device: {:?}", e); - return PdError::Failed.into(); - }; + power.state.detach(); } Ok(()) @@ -222,7 +215,7 @@ where async fn process_port_status_changed<'b>( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, local_port_id: LocalPortId, status_event: PortStatusChanged, ) -> Result, Error<::BusError>> { @@ -234,11 +227,12 @@ where let status = controller.get_port_status(local_port_id).await?; trace!("Port{} status: {:#?}", global_port_id.0, status); - - let power = self - .get_power_device(local_port_id) - .ok_or(Error::Pd(PdError::InvalidPort))?; trace!("Port{} status events: {:#?}", global_port_id.0, status_event); + + let power = state + .port_power_mut() + .get_mut(local_port_id.0 as usize) + .ok_or(PdError::InvalidPort)?; if status_event.plug_inserted_or_removed() { self.process_plug_event(controller, power, local_port_id, &status) .await?; @@ -272,7 +266,7 @@ where /// Finalize a port status change output async fn finalize_port_status_change( &self, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, local_port: LocalPortId, status_event: PortStatusChanged, status: PortStatus, @@ -307,7 +301,7 @@ where /// Finalize a PD alert output async fn finalize_pd_alert( &self, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, local_port: LocalPortId, alert: Ado, ) -> Result<(), Error<::BusError>> { @@ -500,13 +494,9 @@ where } Event::PowerPolicyCommand(EventPowerPolicyCommand { port, request }) => { let response = self - .process_power_command(&mut controller, state.deref_mut().deref_mut(), port, &request.command) + .process_power_command(&mut controller, state.deref_mut().deref_mut(), port, &request) .await; - Ok(Output::PowerPolicyCommand(OutputPowerPolicyCommand { - port, - request, - response, - })) + Ok(Output::PowerPolicyCommand(OutputPowerPolicyCommand { port, response })) } Event::ControllerCommand(request) => { let response = self @@ -556,8 +546,14 @@ where .finalize_vdm(state.deref_mut().deref_mut(), vdm) .await .map_err(Error::Pd), - Output::PowerPolicyCommand(OutputPowerPolicyCommand { request, response, .. }) => { - request.respond(response); + Output::PowerPolicyCommand(OutputPowerPolicyCommand { port, response }) => { + self.power_proxy_receivers + .get(port.0 as usize) + .ok_or(Error::Pd(PdError::InvalidPort))? + .lock() + .await + .send(response) + .await; Ok(()) } Output::ControllerCommand(OutputControllerCommand { request, response }) => { @@ -631,7 +627,14 @@ where } } -impl<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> Lockable for ControllerWrapper<'device, M, C, V> +impl< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> Lockable for ControllerWrapper<'device, M, C, S, R, V> where ::Inner: Controller, { diff --git a/type-c-service/src/wrapper/pd.rs b/type-c-service/src/wrapper/pd.rs index 2b24fd47..f6798184 100644 --- a/type-c-service/src/wrapper/pd.rs +++ b/type-c-service/src/wrapper/pd.rs @@ -2,6 +2,7 @@ use embassy_futures::yield_now; use embassy_sync::pubsub::WaitResult; use embassy_time::{Duration, Timer}; use embedded_services::debug; +use embedded_services::power::policy::device::State; use embedded_services::type_c::Cached; use embedded_services::type_c::controller::{InternalResponseData, Response}; use embedded_usb_pd::constants::{T_PS_TRANSITION_EPR_MS, T_PS_TRANSITION_SPR_MS}; @@ -9,13 +10,20 @@ use embedded_usb_pd::ucsi::{self, lpm}; use super::*; -impl<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> ControllerWrapper<'device, M, C, V> +impl< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> ControllerWrapper<'device, M, C, S, R, V> where ::Inner: Controller, { async fn process_get_pd_alert( &self, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, local_port: LocalPortId, ) -> Result, PdError> { if local_port.0 as usize >= state.num_ports() { @@ -46,7 +54,7 @@ where /// even for controllers that might not always broadcast sink ready events. pub(super) async fn check_sink_ready_timeout( &self, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, status: &PortStatus, port: LocalPortId, new_contract: bool, @@ -108,34 +116,24 @@ where LocalPortId(port_index as u8) } - /// Set the maximum sink voltage for a port - pub async fn set_max_sink_voltage(&self, local_port: LocalPortId, voltage_mv: Option) -> Result<(), PdError> { - let mut controller = self.controller.lock().await; - let _ = self - .process_set_max_sink_voltage(&mut controller, local_port, voltage_mv) - .await?; - Ok(()) - } - /// Process a request to set the maximum sink voltage for a port async fn process_set_max_sink_voltage( &self, controller: &mut C::Inner, + state: &mut dyn DynPortState<'_, S>, local_port: LocalPortId, voltage_mv: Option, ) -> Result { - let power_device = self.get_power_device(local_port).ok_or(PdError::InvalidPort)?; - - let state = power_device.state().await; + let port_power = state + .port_power_mut() + .get_mut(local_port.0 as usize) + .ok_or(PdError::InvalidPort)?; + let state = port_power.state.state(); debug!("Port{}: Current state: {:#?}", local_port.0, state); - if let Ok(connected_consumer) = power_device.try_device_action::().await { + if matches!(state, State::ConnectedConsumer(_)) { debug!("Port{}: Set max sink voltage, connected consumer found", local_port.0); if voltage_mv.is_some() - && voltage_mv - < power_device - .consumer_capability() - .await - .map(|c| c.capability.voltage_mv) + && voltage_mv < port_power.state.consumer_capability().map(|c| c.capability.voltage_mv) { // New max voltage is lower than current consumer capability which will trigger a renegociation // So disconnect first @@ -143,7 +141,7 @@ where "Port{}: Disconnecting consumer before setting max sink voltage", local_port.0 ); - let _ = connected_consumer.disconnect().await; + port_power.sender.send(policy::RequestData::Disconnected).await; } } @@ -159,7 +157,7 @@ where async fn process_get_port_status( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, local_port: LocalPortId, cached: Cached, ) -> Result { @@ -186,7 +184,7 @@ where async fn process_port_command( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, command: &controller::PortCommand, ) -> Response<'static> { if state.controller_state().fw_update_state.in_progress() { @@ -259,7 +257,7 @@ where controller::PortCommandData::SetMaxSinkVoltage(voltage_mv) => { match self.registration.pd_controller.lookup_local_port(command.port) { Ok(local_port) => { - self.process_set_max_sink_voltage(controller, local_port, voltage_mv) + self.process_set_max_sink_voltage(controller, state, local_port, voltage_mv) .await } Err(e) => Err(e), @@ -389,7 +387,7 @@ where async fn process_controller_command( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, command: &controller::InternalCommandData, ) -> Response<'static> { if state.controller_state().fw_update_state.in_progress() { @@ -425,7 +423,7 @@ where pub(super) async fn process_pd_command( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, command: &controller::Command, ) -> Response<'static> { match command { diff --git a/type-c-service/src/wrapper/power.rs b/type-c-service/src/wrapper/power.rs index e4aa919b..a3c54f36 100644 --- a/type-c-service/src/wrapper/power.rs +++ b/type-c-service/src/wrapper/power.rs @@ -1,132 +1,76 @@ //! Module contain power-policy related message handling -use core::future; +use core::pin::pin; +use embassy_futures::select::select_slice; use embedded_services::{ debug, - ipc::deferred, power::policy::{ ConsumerPowerCapability, ProviderPowerCapability, - device::{CommandData, InternalResponseData}, + device::{CommandData, InternalResponseData, ResponseData}, }, }; +use embedded_services::power::policy::Error as PowerError; +use embedded_services::power::policy::device::CommandData as PowerCommand; + use super::*; -impl<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> ControllerWrapper<'device, M, C, V> +impl< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> ControllerWrapper<'device, M, C, S, R, V> where ::Inner: Controller, { - /// Return the power device for the given port - pub fn get_power_device(&self, port: LocalPortId) -> Option<&policy::device::Device> { - self.registration.power_devices.get(port.0 as usize) - } - /// Handle a new contract as consumer pub(super) async fn process_new_consumer_contract( &self, - power: &policy::device::Device, + power: &mut PortPower, status: &PortStatus, ) -> Result<(), Error<::BusError>> { info!("Process new consumer contract"); - let current_state = power.state().await.kind(); + let current_state = power.state.state(); info!("current power state: {:?}", current_state); - // Recover if we're not in the correct state - if status.is_connected() { - if let action::device::AnyState::Detached(state) = power.device_action().await { - warn!("Power device is detached, attempting to attach"); - if let Err(e) = state.attach().await { - error!("Error attaching power device: {:?}", e); - return PdError::Failed.into(); - } - } - } - let available_sink_contract = status.available_sink_contract.map(|c| { let mut c: ConsumerPowerCapability = c.into(); c.flags.set_unconstrained_power(status.unconstrained_power); c }); - if let Ok(state) = power.try_device_action::().await { - if let Err(e) = state.notify_consumer_power_capability(available_sink_contract).await { - error!("Error setting power contract: {:?}", e); - return PdError::Failed.into(); - } - } else if let Ok(state) = power.try_device_action::().await { - if let Err(e) = state.notify_consumer_power_capability(available_sink_contract).await { - error!("Error setting power contract: {:?}", e); - return PdError::Failed.into(); - } - } else if let Ok(state) = power.try_device_action::().await { - if let Err(e) = state.notify_consumer_power_capability(available_sink_contract).await { - error!("Error setting power contract: {:?}", e); - return PdError::Failed.into(); - } - } else { - error!("Invalid mode"); - return PdError::InvalidMode.into(); + if let Err(e) = power.state.update_consumer_power_capability(available_sink_contract) { + warn!( + "Device was not in correct state for consumer contract, recovered: {:#?}", + e + ); } - Ok(()) } /// Handle a new contract as provider pub(super) async fn process_new_provider_contract( &self, - power: &policy::device::Device, + power: &mut PortPower, status: &PortStatus, ) -> Result<(), Error<::BusError>> { info!("Process New provider contract"); - let current_state = power.state().await.kind(); + let current_state = power.state.state(); info!("current power state: {:?}", current_state); - if let action::device::AnyState::ConnectedConsumer(state) = power.device_action().await { - info!("ConnectedConsumer"); - if let Err(e) = state.detach().await { - info!("Error detaching power device: {:?}", e); - return PdError::Failed.into(); - } + if let Err(e) = power.state.update_requested_provider_power_capability( + status.available_sink_contract.map(ProviderPowerCapability::from), + ) { + warn!( + "Device was not in correct state for provider contract, recovered: {:#?}", + e + ); } - - // Recover if we're not in the correct state - if status.is_connected() { - if let action::device::AnyState::Detached(state) = power.device_action().await { - warn!("Power device is detached, attempting to attach"); - if let Err(e) = state.attach().await { - error!("Error attaching power device: {:?}", e); - return PdError::Failed.into(); - } - } - } - - if let Ok(state) = power.try_device_action::().await { - if let Some(contract) = status.available_source_contract { - if let Err(e) = state.request_provider_power_capability(contract.into()).await { - error!("Error setting power contract: {:?}", e); - return PdError::Failed.into(); - } - } - } else if let Ok(state) = power.try_device_action::().await { - if let Some(contract) = status.available_source_contract { - if let Err(e) = state.request_provider_power_capability(contract.into()).await { - error!("Error setting power contract: {:?}", e); - return PdError::Failed.into(); - } - } else { - // No longer need to source, so disconnect - if let Err(e) = state.disconnect().await { - error!("Error setting power contract: {:?}", e); - return PdError::Failed.into(); - } - } - } else { - error!("Invalid mode"); - return PdError::InvalidMode.into(); - } - Ok(()) } @@ -135,15 +79,21 @@ where &self, port: LocalPortId, controller: &mut C::Inner, - power: &policy::device::Device, + power: &mut PortPower, ) -> Result<(), Error<::BusError>> { - let state = power.state().await.kind(); - if state == StateKind::ConnectedConsumer { + if power.state.state().kind() == StateKind::ConnectedConsumer { info!("Port{}: Disconnect from ConnectedConsumer", port.0); if controller.enable_sink_path(port, false).await.is_err() { error!("Error disabling sink path"); return PdError::Failed.into(); } + + if let Err(e) = power.state.disconnect(false) { + warn!( + "{:?}: Device was not in correct state for disconnect, recovered: {:#?}", + port, e + ); + } } Ok(()) @@ -165,22 +115,23 @@ where /// /// Returns (local port ID, deferred request) /// DROP SAFETY: Call to a select over drop safe futures - pub(super) async fn wait_power_command( - &self, - ) -> ( - LocalPortId, - deferred::Request<'_, GlobalRawMutex, CommandData, InternalResponseData>, - ) { - let futures: [_; MAX_SUPPORTED_PORTS] = from_fn(|i| async move { - if let Some(device) = self.registration.power_devices.get(i) { - device.receive().await - } else { - future::pending().await + pub(super) async fn wait_power_command(&self) -> (LocalPortId, CommandData) { + let mut futures = heapless::Vec::<_, MAX_SUPPORTED_PORTS>::new(); + for receiver in self.power_proxy_receivers { + if futures + .push(async { + let mut lock = receiver.lock().await; + lock.receive().await + }) + .is_err() + { + error!("Futures vec overflow"); } - }); + } + // DROP SAFETY: Select over drop safe futures - let (request, local_id) = select_array(futures).await; - trace!("Power command: device{} {:#?}", local_id, request.command); + let (request, local_id) = select_slice(pin!(futures.as_mut_slice())).await; + trace!("Power command: device{} {:#?}", local_id, request); (LocalPortId(local_id as u8), request) } @@ -189,53 +140,51 @@ where pub(super) async fn process_power_command( &self, controller: &mut C::Inner, - state: &mut dyn DynPortState<'_>, + state: &mut dyn DynPortState<'_, S>, port: LocalPortId, command: &CommandData, ) -> InternalResponseData { trace!("Processing power command: device{} {:#?}", port.0, command); if state.controller_state().fw_update_state.in_progress() { debug!("Port{}: Firmware update in progress", port.0); - return Err(policy::Error::Busy); + return Err(PowerError::Busy); } - let power = match self.get_power_device(port) { - Some(power) => power, - None => { - error!("Port{}: Error getting power device for port", port.0); - return Err(policy::Error::InvalidDevice); - } - }; + let power = state.port_power_mut().get_mut(port.0 as usize); + if power.is_none() { + return Err(PowerError::InvalidDevice); + } + let power = power.unwrap(); match command { - policy::device::CommandData::ConnectAsConsumer(capability) => { + PowerCommand::ConnectAsConsumer(capability) => { info!( "Port{}: Connect as consumer: {:?}, enable input switch", port.0, capability ); if controller.enable_sink_path(port, true).await.is_err() { error!("Error enabling sink path"); - return Err(policy::Error::Failed); + return Err(PowerError::Failed); } } - policy::device::CommandData::ConnectAsProvider(capability) => { + PowerCommand::ConnectAsProvider(capability) => { if self .process_connect_as_provider(port, *capability, controller) .await .is_err() { error!("Error processing connect provider"); - return Err(policy::Error::Failed); + return Err(PowerError::Failed); } } - policy::device::CommandData::Disconnect => { + PowerCommand::Disconnect => { if self.process_disconnect(port, controller, power).await.is_err() { error!("Error processing disconnect"); - return Err(policy::Error::Failed); + return Err(PowerError::Failed); } } } - Ok(policy::device::ResponseData::Complete) + Ok(ResponseData::Complete) } } diff --git a/type-c-service/src/wrapper/proxy.rs b/type-c-service/src/wrapper/proxy.rs new file mode 100644 index 00000000..7a69646a --- /dev/null +++ b/type-c-service/src/wrapper/proxy.rs @@ -0,0 +1,99 @@ +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; +use embedded_services::power; +use embedded_services::power::policy::device::{ + CommandData as PolicyCommandData, DeviceTrait, InternalResponseData as PolicyResponseData, +}; + +pub struct PowerProxyChannel { + command_channel: Channel, + response_channel: Channel, +} + +impl PowerProxyChannel { + pub fn new() -> Self { + Self { + command_channel: Channel::new(), + response_channel: Channel::new(), + } + } + + pub fn get_device(&self) -> PowerProxyDevice<'_> { + PowerProxyDevice { + sender: self.command_channel.dyn_sender(), + receiver: self.response_channel.dyn_receiver(), + } + } + + pub fn get_receiver(&self) -> PowerProxyReceiver<'_> { + PowerProxyReceiver { + receiver: self.command_channel.dyn_receiver(), + sender: self.response_channel.dyn_sender(), + } + } +} + +pub struct PowerProxyReceiver<'a> { + sender: DynamicSender<'a, PolicyResponseData>, + receiver: DynamicReceiver<'a, PolicyCommandData>, +} + +impl<'a> PowerProxyReceiver<'a> { + pub fn new( + receiver: DynamicReceiver<'a, PolicyCommandData>, + sender: DynamicSender<'a, PolicyResponseData>, + ) -> Self { + Self { receiver, sender } + } + + pub async fn receive(&mut self) -> PolicyCommandData { + self.receiver.receive().await + } + + pub async fn send(&mut self, response: PolicyResponseData) { + self.sender.send(response).await; + } +} + +pub struct PowerProxyDevice<'a> { + sender: DynamicSender<'a, PolicyCommandData>, + receiver: DynamicReceiver<'a, PolicyResponseData>, +} + +impl<'a> PowerProxyDevice<'a> { + pub fn new( + sender: DynamicSender<'a, PolicyCommandData>, + receiver: DynamicReceiver<'a, PolicyResponseData>, + ) -> Self { + Self { sender, receiver } + } + + async fn execute(&mut self, command: PolicyCommandData) -> PolicyResponseData { + self.sender.send(command).await; + self.receiver.receive().await + } +} + +impl<'a> DeviceTrait for PowerProxyDevice<'a> { + async fn disconnect(&mut self) -> Result<(), power::policy::Error> { + self.execute(PolicyCommandData::Disconnect).await?.complete_or_err() + } + + async fn connect_provider( + &mut self, + capability: power::policy::ProviderPowerCapability, + ) -> Result<(), power::policy::Error> { + self.execute(PolicyCommandData::ConnectAsProvider(capability)) + .await? + .complete_or_err() + } + + async fn connect_consumer( + &mut self, + capability: power::policy::ConsumerPowerCapability, + ) -> Result<(), power::policy::Error> { + self.execute(PolicyCommandData::ConnectAsConsumer(capability)) + .await? + .complete_or_err() + } +} diff --git a/type-c-service/src/wrapper/vdm.rs b/type-c-service/src/wrapper/vdm.rs index d803c4c5..ca7ca7fa 100644 --- a/type-c-service/src/wrapper/vdm.rs +++ b/type-c-service/src/wrapper/vdm.rs @@ -1,5 +1,7 @@ use embassy_sync::blocking_mutex::raw::RawMutex; use embedded_services::{ + event, + power::policy::policy, sync::Lockable, trace, type_c::{ @@ -13,7 +15,14 @@ use crate::wrapper::{DynPortState, message::vdm::OutputKind}; use super::{ControllerWrapper, FwOfferValidator, message::vdm::Output}; -impl<'device, M: RawMutex, C: Lockable, V: FwOfferValidator> ControllerWrapper<'device, M, C, V> +impl< + 'device, + M: RawMutex, + C: Lockable, + S: event::Sender, + R: event::Receiver, + V: FwOfferValidator, +> ControllerWrapper<'device, M, C, S, R, V> where ::Inner: Controller, { @@ -36,7 +45,11 @@ where } /// Finalize a VDM output by notifying the service. - pub(super) async fn finalize_vdm(&self, state: &mut dyn DynPortState<'_>, output: Output) -> Result<(), PdError> { + pub(super) async fn finalize_vdm( + &self, + state: &mut dyn DynPortState<'_, S>, + output: Output, + ) -> Result<(), PdError> { trace!("Finalizing VDM output: {:?}", output); let Output { port, kind } = output; let global_port_id = self.registration.pd_controller.lookup_global_port(port)?;