Skip to content

Commit 15e6533

Browse files
committed
Add direct async calls proof of concept
1 parent 35cba7d commit 15e6533

File tree

3 files changed

+380
-0
lines changed

3 files changed

+380
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::{future::Future, pin::pin};
2+
3+
use embassy_futures::select::select_slice;
4+
use embassy_sync::{
5+
blocking_mutex::raw::NoopRawMutex,
6+
mutex::Mutex,
7+
pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult},
8+
};
9+
use log::{info, warn};
10+
11+
pub trait EventReceiver {
12+
fn wait_next(&mut self) -> impl Future<Output = InterruptEvent>;
13+
}
14+
15+
pub trait EventSender {
16+
fn on_interrupt(&self, pin: u8, level: bool);
17+
}
18+
19+
/// IO expander device trait
20+
pub trait Device {
21+
fn name(&self) -> &str;
22+
23+
fn set_level(&mut self, pin: u8, value: bool) -> impl Future<Output = ()>;
24+
}
25+
26+
pub struct Sender<'channel> {
27+
publisher: DynImmediatePublisher<'channel, InterruptEvent>,
28+
}
29+
30+
impl<'channel> Sender<'channel> {
31+
pub fn new(publisher: DynImmediatePublisher<'channel, InterruptEvent>) -> Self {
32+
Self { publisher }
33+
}
34+
}
35+
36+
pub struct Receiver<'channel> {
37+
subscriber: DynSubscriber<'channel, InterruptEvent>,
38+
}
39+
40+
impl<'channel> Receiver<'channel> {
41+
pub fn new(subscriber: DynSubscriber<'channel, InterruptEvent>) -> Self {
42+
Self { subscriber }
43+
}
44+
}
45+
46+
impl EventReceiver for Receiver<'_> {
47+
async fn wait_next(&mut self) -> InterruptEvent {
48+
loop {
49+
match self.subscriber.next_message().await {
50+
WaitResult::Message(msg) => return msg,
51+
WaitResult::Lagged(n) => {
52+
warn!("Receiver lagged by {} messages", n);
53+
}
54+
}
55+
}
56+
}
57+
}
58+
59+
impl EventSender for Sender<'_> {
60+
fn on_interrupt(&self, pin: u8, level: bool) {
61+
self.publisher.publish_immediate(InterruptEvent { pin, level });
62+
}
63+
}
64+
65+
#[derive(Copy, Clone)]
66+
pub struct InterruptEvent {
67+
pub pin: u8,
68+
pub level: bool,
69+
}
70+
71+
const MAX_SUPPORTED_DEVICES: usize = 4;
72+
73+
pub struct ServiceImplementation<'storage, 'device, D: Device, R: EventReceiver> {
74+
devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)],
75+
}
76+
77+
impl<'storage, 'device, D: Device, R: EventReceiver> ServiceImplementation<'storage, 'device, D, R> {
78+
pub fn new(devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)]) -> Self {
79+
Self { devices }
80+
}
81+
82+
pub async fn wait_next(&mut self) -> (&'device Mutex<NoopRawMutex, D>, InterruptEvent) {
83+
let futures =
84+
heapless::Vec::<_, MAX_SUPPORTED_DEVICES>::from_iter(self.devices.iter_mut().map(|(r, _)| r.wait_next()));
85+
86+
let (event, index) = select_slice(pin!(futures)).await;
87+
(self.devices[index].1, event)
88+
}
89+
90+
pub async fn process_event(&mut self, event: (&'device Mutex<NoopRawMutex, D>, InterruptEvent)) {
91+
let mut device = event.0.lock().await;
92+
info!(
93+
"Interrupt from device {}: pin {}, level {}",
94+
device.name(),
95+
event.1.pin,
96+
event.1.level
97+
);
98+
if event.1.pin == 0 {
99+
info!("Asserting INT_OUT pin");
100+
}
101+
102+
device.set_level(1, event.1.level).await;
103+
}
104+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use embassy_executor::Executor;
2+
use embassy_futures::join::join3;
3+
use embassy_sync::{blocking_mutex::raw::NoopRawMutex, mutex::Mutex, pubsub::PubSubChannel};
4+
use embassy_time::Timer;
5+
use static_cell::StaticCell;
6+
7+
use crate::{io_expander::EventSender as _, power::EventSender as _};
8+
9+
mod io_expander;
10+
mod power;
11+
12+
pub struct Device<I: io_expander::EventSender, P: power::EventSender> {
13+
name: &'static str,
14+
io_sender: I,
15+
power_sender: P,
16+
}
17+
18+
pub struct DeviceContainer<I: io_expander::EventSender, P: power::EventSender> {
19+
inner: Mutex<NoopRawMutex, Device<I, P>>,
20+
}
21+
22+
impl<'device, I: io_expander::EventSender, P: power::EventSender> DeviceContainer<I, P> {
23+
pub fn new(name: &'static str, io_sender: I, power_sender: P) -> Self {
24+
Self {
25+
inner: Mutex::new(Device {
26+
name,
27+
io_sender,
28+
power_sender,
29+
}),
30+
}
31+
}
32+
}
33+
34+
impl<I: io_expander::EventSender, P: power::EventSender> power::Device for Device<I, P> {
35+
fn name(&self) -> &str {
36+
self.name
37+
}
38+
39+
async fn accept_contract(&mut self) {
40+
log::info!("{}: Contract accepted", self.name);
41+
}
42+
43+
async fn disconnect(&mut self) {
44+
log::info!("{}: Device disconnected", self.name);
45+
}
46+
}
47+
48+
impl<I: io_expander::EventSender, P: power::EventSender> io_expander::Device for Device<I, P> {
49+
fn name(&self) -> &str {
50+
self.name
51+
}
52+
53+
async fn set_level(&mut self, pin: u8, value: bool) {
54+
log::info!("{}: Set pin {} to level {}", self.name, pin, value);
55+
}
56+
}
57+
58+
#[embassy_executor::task]
59+
async fn run() {
60+
let power_channel0: PubSubChannel<NoopRawMutex, power::Event, 4, 1, 1> = PubSubChannel::new();
61+
let power_receiver0 = power::Receiver::new(power_channel0.dyn_subscriber().unwrap());
62+
let power_sender0 = power::Sender::new(power_channel0.dyn_immediate_publisher());
63+
64+
let io_channel0: PubSubChannel<NoopRawMutex, io_expander::InterruptEvent, 4, 1, 1> = PubSubChannel::new();
65+
let io_receiver0 = io_expander::Receiver::new(io_channel0.dyn_subscriber().unwrap());
66+
let io_sender0 = io_expander::Sender::new(io_channel0.dyn_immediate_publisher());
67+
let device0 = DeviceContainer::new("Device0", io_sender0, power_sender0);
68+
69+
let power_channel1: PubSubChannel<NoopRawMutex, power::Event, 4, 1, 1> = PubSubChannel::new();
70+
let power_receiver1 = power::Receiver::new(power_channel1.dyn_subscriber().unwrap());
71+
let power_sender1 = power::Sender::new(power_channel1.dyn_immediate_publisher());
72+
73+
let io_channel1: PubSubChannel<NoopRawMutex, io_expander::InterruptEvent, 4, 1, 1> = PubSubChannel::new();
74+
let io_receiver1 = io_expander::Receiver::new(io_channel1.dyn_subscriber().unwrap());
75+
let io_sender1 = io_expander::Sender::new(io_channel1.dyn_immediate_publisher());
76+
let device1 = DeviceContainer::new("Device1", io_sender1, power_sender1);
77+
78+
let mut power_devices = [(power_receiver0, &device0.inner), (power_receiver1, &device1.inner)];
79+
let mut power_service = power::ServiceImplementation::new(&mut power_devices);
80+
81+
let mut io_devices = [(io_receiver0, &device0.inner), (io_receiver1, &device1.inner)];
82+
let mut io_service = io_expander::ServiceImplementation::new(&mut io_devices);
83+
84+
join3(
85+
async {
86+
loop {
87+
let event = io_service.wait_next().await;
88+
io_service.process_event(event).await;
89+
}
90+
},
91+
async {
92+
loop {
93+
let event = power_service.wait_next().await;
94+
power_service.process_event(event).await;
95+
}
96+
},
97+
async {
98+
device0.inner.lock().await.power_sender.on_plug(1000);
99+
device0.inner.lock().await.io_sender.on_interrupt(0, true);
100+
Timer::after_millis(500).await;
101+
102+
device1.inner.lock().await.power_sender.on_plug(2000);
103+
Timer::after_millis(500).await;
104+
105+
device0.inner.lock().await.power_sender.on_unplug();
106+
Timer::after_millis(500).await;
107+
108+
device1.inner.lock().await.io_sender.on_interrupt(0, true);
109+
device1.inner.lock().await.power_sender.on_unplug();
110+
Timer::after_millis(500).await;
111+
},
112+
)
113+
.await;
114+
}
115+
116+
pub fn main() {
117+
env_logger::builder().filter_level(log::LevelFilter::Trace).init();
118+
119+
static EXECUTOR: StaticCell<Executor> = StaticCell::new();
120+
let executor = EXECUTOR.init(Executor::new());
121+
executor.run(|spawner| {
122+
spawner.must_spawn(run());
123+
});
124+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use std::{future::Future, pin::pin};
2+
3+
use embassy_futures::select::select_slice;
4+
use embassy_sync::{
5+
blocking_mutex::raw::NoopRawMutex,
6+
mutex::Mutex,
7+
pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult},
8+
};
9+
10+
use log::{info, warn};
11+
12+
/// Receive events from a [`Device`]
13+
pub trait EventReceiver {
14+
fn wait_next(&mut self) -> impl Future<Output = Event>;
15+
}
16+
17+
pub trait EventSender {
18+
fn on_plug(&self, power_mw: i32);
19+
fn on_unplug(&self);
20+
}
21+
22+
/// Power device trait
23+
pub trait Device {
24+
fn name(&self) -> &str;
25+
26+
fn accept_contract(&mut self) -> impl Future<Output = ()>;
27+
fn disconnect(&mut self) -> impl Future<Output = ()>;
28+
}
29+
30+
#[derive(Copy, Clone, Debug)]
31+
pub struct NewContract {
32+
pub power_mw: i32,
33+
}
34+
35+
#[derive(Copy, Clone, Debug)]
36+
pub enum Event {
37+
Plug(NewContract),
38+
Unplug,
39+
}
40+
41+
struct CurrentContract<'device, D: Device> {
42+
power_mw: i32,
43+
connected_device: &'device Mutex<NoopRawMutex, D>,
44+
}
45+
46+
pub struct Sender<'channel> {
47+
publisher: DynImmediatePublisher<'channel, Event>,
48+
}
49+
50+
impl<'channel> Sender<'channel> {
51+
pub fn new(publisher: DynImmediatePublisher<'channel, Event>) -> Self {
52+
Self { publisher }
53+
}
54+
}
55+
56+
pub struct Receiver<'channel> {
57+
subscriber: DynSubscriber<'channel, Event>,
58+
}
59+
60+
impl<'channel> Receiver<'channel> {
61+
pub fn new(subscriber: DynSubscriber<'channel, Event>) -> Self {
62+
Self { subscriber }
63+
}
64+
}
65+
66+
impl EventReceiver for Receiver<'_> {
67+
async fn wait_next(&mut self) -> Event {
68+
loop {
69+
match self.subscriber.next_message().await {
70+
WaitResult::Message(msg) => return msg,
71+
WaitResult::Lagged(n) => {
72+
warn!("Receiver lagged by {} messages", n);
73+
}
74+
}
75+
}
76+
}
77+
}
78+
79+
impl EventSender for Sender<'_> {
80+
fn on_plug(&self, power_mw: i32) {
81+
self.publisher.publish_immediate(Event::Plug(NewContract { power_mw }));
82+
}
83+
84+
fn on_unplug(&self) {
85+
self.publisher.publish_immediate(Event::Unplug);
86+
}
87+
}
88+
89+
pub struct ServiceImplementation<'storage, 'device, D: Device, R: EventReceiver> {
90+
current_connection: Option<CurrentContract<'device, D>>,
91+
devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)],
92+
}
93+
94+
const MAX_SUPPORTED_DEVICES: usize = 4;
95+
96+
impl<'storage, 'device, D: Device, R: EventReceiver> ServiceImplementation<'storage, 'device, D, R> {
97+
pub fn new(devices: &'storage mut [(R, &'device Mutex<NoopRawMutex, D>)]) -> Self {
98+
Self {
99+
devices,
100+
current_connection: None,
101+
}
102+
}
103+
104+
pub async fn wait_next(&mut self) -> (&'device Mutex<NoopRawMutex, D>, Event) {
105+
let futures =
106+
heapless::Vec::<_, MAX_SUPPORTED_DEVICES>::from_iter(self.devices.iter_mut().map(|(r, _)| r.wait_next()));
107+
108+
let (event, index) = select_slice(pin!(futures)).await;
109+
(self.devices[index].1, event)
110+
}
111+
112+
pub async fn process_event(&mut self, event: (&'device Mutex<NoopRawMutex, D>, Event)) {
113+
let mut event_device = event.0.lock().await;
114+
match event.1 {
115+
Event::Plug(data) => {
116+
info!("{} connected with contract: {:?}", event_device.name(), data.power_mw);
117+
if let Some(current) = &self.current_connection {
118+
if data.power_mw > current.power_mw {
119+
info!("New contract has higher power, switching");
120+
current.connected_device.lock().await.disconnect().await;
121+
122+
self.current_connection = Some(CurrentContract {
123+
power_mw: data.power_mw,
124+
connected_device: event.0,
125+
});
126+
event_device.accept_contract().await;
127+
} else {
128+
info!("New contract has lower or equal power, not switching");
129+
}
130+
} else {
131+
info!("No current contract, accepting new one");
132+
self.current_connection = Some(CurrentContract {
133+
power_mw: data.power_mw,
134+
connected_device: event.0,
135+
});
136+
event_device.accept_contract().await;
137+
}
138+
}
139+
Event::Unplug => {
140+
info!("{} disconnected", event_device.name());
141+
if let Some(current) = &self.current_connection {
142+
if std::ptr::eq(current.connected_device, event.0) {
143+
info!("Current device disconnected");
144+
self.current_connection = None;
145+
} else {
146+
info!("A non-connected device unplugged, nothing to do");
147+
}
148+
}
149+
}
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)