Skip to content

Commit 3fba2d8

Browse files
authored
power-policy: Broadcast events (#411)
This provides a more general way to receive events than the comms service.
1 parent 4c79138 commit 3fba2d8

File tree

4 files changed

+51
-3
lines changed

4 files changed

+51
-3
lines changed

embedded-service/src/power/policy/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ pub enum CommsData {
129129
Unconstrained(bool),
130130
}
131131

132+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133+
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
132134
/// Message to send with the comms service
133135
pub struct CommsMessage {
134136
/// Message data

embedded-service/src/power/policy/policy.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
use core::sync::atomic::{AtomicBool, Ordering};
33

44
use crate::GlobalRawMutex;
5-
use crate::power::policy::{ConsumerPowerCapability, ProviderPowerCapability};
5+
use crate::broadcaster::immediate as broadcaster;
6+
use crate::power::policy::{CommsMessage, ConsumerPowerCapability, ProviderPowerCapability};
67
use embassy_sync::channel::Channel;
78
use embassy_sync::once_lock::OnceLock;
89

@@ -81,6 +82,8 @@ struct Context {
8182
policy_response: Channel<GlobalRawMutex, InternalResponseData, POLICY_CHANNEL_SIZE>,
8283
/// Registered chargers
8384
chargers: intrusive_list::IntrusiveList,
85+
/// Message broadcaster
86+
broadcaster: broadcaster::Immediate<CommsMessage>,
8487
}
8588

8689
impl Context {
@@ -90,6 +93,7 @@ impl Context {
9093
chargers: intrusive_list::IntrusiveList::new(),
9194
policy_request: Channel::new(),
9295
policy_response: Channel::new(),
96+
broadcaster: broadcaster::Immediate::default(),
9397
}
9498
}
9599
}
@@ -188,6 +192,13 @@ pub async fn check_chargers_ready() -> ChargerResponse {
188192
Ok(Ack)
189193
}
190194

195+
/// Register a message receiver for power policy messages
196+
pub async fn register_message_receiver(
197+
receiver: &'static broadcaster::Receiver<'_, CommsMessage>,
198+
) -> intrusive_list::Result<()> {
199+
CONTEXT.get().await.broadcaster.register_receiver(receiver)
200+
}
201+
191202
/// Singleton struct to give access to the power policy context
192203
pub struct ContextToken(());
193204

@@ -255,4 +266,9 @@ impl ContextToken {
255266
pub async fn policy_action(&self, id: DeviceId) -> Result<action::policy::AnyState<'_>, Error> {
256267
Ok(self.get_device(id).await?.policy_action().await)
257268
}
269+
270+
/// Broadcast a power policy message to all subscribers
271+
pub async fn broadcast_message(&self, message: CommsMessage) {
272+
CONTEXT.get().await.broadcaster.broadcast(message).await;
273+
}
258274
}

examples/std/src/bin/power_policy.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use embassy_executor::{Executor, Spawner};
2-
use embassy_sync::once_lock::OnceLock;
2+
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, once_lock::OnceLock, pubsub::PubSubChannel};
33
use embassy_time::{self as _, Timer};
4-
use embedded_services::power::policy::{self, ConsumerPowerCapability, PowerCapability, device, flags};
4+
use embedded_services::{
5+
broadcaster::immediate as broadcaster,
6+
power::policy::{self, ConsumerPowerCapability, PowerCapability, device, flags},
7+
};
58
use log::*;
69
use static_cell::StaticCell;
710

@@ -186,6 +189,31 @@ async fn run(spawner: Spawner) {
186189
Timer::after_millis(250).await;
187190
}
188191

192+
#[embassy_executor::task]
193+
async fn receiver_task() {
194+
static CHANNEL: StaticCell<PubSubChannel<NoopRawMutex, policy::CommsMessage, 1, 1, 0>> = StaticCell::new();
195+
let channel = CHANNEL.init(PubSubChannel::new());
196+
197+
let publisher = channel.dyn_immediate_publisher();
198+
let mut subscriber = channel.dyn_subscriber().unwrap();
199+
200+
static RECEIVER: StaticCell<broadcaster::Receiver<'static, policy::CommsMessage>> = StaticCell::new();
201+
let receiver = RECEIVER.init(broadcaster::Receiver::new(publisher));
202+
203+
policy::policy::register_message_receiver(receiver).await.unwrap();
204+
205+
loop {
206+
match subscriber.next_message().await {
207+
embassy_sync::pubsub::WaitResult::Message(msg) => {
208+
info!("Received message: {msg:?}");
209+
}
210+
embassy_sync::pubsub::WaitResult::Lagged(count) => {
211+
warn!("Lagged messages: {count}");
212+
}
213+
}
214+
}
215+
}
216+
189217
fn main() {
190218
env_logger::builder().filter_level(log::LevelFilter::Trace).init();
191219

@@ -196,5 +224,6 @@ fn main() {
196224
power_policy_service::config::Config::default(),
197225
));
198226
spawner.must_spawn(run(spawner));
227+
spawner.must_spawn(receiver_task());
199228
});
200229
}

power-policy-service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ impl PowerPolicy {
8686

8787
/// Send a notification with the comms service
8888
async fn comms_notify(&self, message: CommsMessage) {
89+
self.context.broadcast_message(message).await;
8990
let _ = self
9091
.tp
9192
.send(comms::EndpointID::Internal(comms::Internal::Battery), &message)

0 commit comments

Comments
 (0)