Skip to content

Commit a1f2713

Browse files
committed
dbus: networkmanager: also change link status update to async/.await
This makes it so we do not use two different patterns for async code in the same file. Signed-off-by: Leonard Göhrs <[email protected]>
1 parent 5631f87 commit a1f2713

File tree

1 file changed

+82
-158
lines changed

1 file changed

+82
-158
lines changed

src/dbus/networkmanager.rs

Lines changed: 82 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ mod optional_includes {
4141
pub(super) use anyhow::{anyhow, Result};
4242
pub(super) use async_std::stream::StreamExt;
4343
pub(super) use async_std::task::sleep;
44-
pub(super) use futures::{future::FutureExt, pin_mut, select};
44+
pub(super) use futures::{future::FutureExt, select};
4545
pub(super) use log::{info, trace};
4646
pub(super) use std::time::Duration;
47-
pub(super) use zbus::{Connection, PropertyStream};
47+
pub(super) use zbus::Connection;
4848
pub(super) use zvariant::OwnedObjectPath;
4949

5050
pub(super) use super::devices::{DeviceProxy, WiredProxy, NM_DEVICE_STATE_ACTIVATED};
@@ -61,101 +61,6 @@ pub struct LinkInfo {
6161
pub carrier: bool,
6262
}
6363

64-
#[cfg(not(feature = "demo_mode"))]
65-
async fn path_from_interface(con: &Connection, interface: &str) -> Result<OwnedObjectPath> {
66-
let proxy = NetworkManagerProxy::new(con).await?;
67-
let device_paths = proxy.get_devices().await?;
68-
69-
for path in device_paths {
70-
let device_proxy = DeviceProxy::builder(con).path(&path)?.build().await?;
71-
72-
let interface_name = device_proxy.interface().await?; // name
73-
74-
// Is this the interface we are interested in?
75-
if interface_name == interface {
76-
return Ok(path);
77-
}
78-
}
79-
Err(anyhow!("No interface found: {}", interface))
80-
}
81-
82-
#[cfg(not(feature = "demo_mode"))]
83-
async fn get_link_info(con: &Connection, path: &str) -> Result<LinkInfo> {
84-
let eth_proxy = WiredProxy::builder(con).path(path)?.build().await?;
85-
86-
let speed = eth_proxy.speed().await?;
87-
let carrier = eth_proxy.carrier().await?;
88-
89-
let info = LinkInfo { speed, carrier };
90-
91-
Ok(info)
92-
}
93-
94-
#[cfg(not(feature = "demo_mode"))]
95-
pub struct LinkStream<'a> {
96-
pub interface: String,
97-
_con: Arc<Connection>,
98-
speed: PropertyStream<'a, u32>,
99-
carrier: PropertyStream<'a, bool>,
100-
data: LinkInfo,
101-
}
102-
103-
#[cfg(not(feature = "demo_mode"))]
104-
impl<'a> LinkStream<'a> {
105-
pub async fn new(con: Arc<Connection>, interface: &str) -> Result<LinkStream<'a>> {
106-
let path = path_from_interface(&con, interface)
107-
.await?
108-
.as_str()
109-
.to_string();
110-
111-
let eth_proxy = WiredProxy::builder(&con)
112-
.path(path.clone())?
113-
.build()
114-
.await?;
115-
116-
let speed = eth_proxy.receive_speed_changed().await;
117-
let carrier = eth_proxy.receive_carrier_changed().await;
118-
119-
let info = get_link_info(&con, path.as_str()).await?;
120-
121-
Ok(Self {
122-
interface: interface.to_string(),
123-
_con: con,
124-
speed,
125-
carrier,
126-
data: info,
127-
})
128-
}
129-
130-
pub fn now(&self) -> LinkInfo {
131-
self.data.clone()
132-
}
133-
134-
pub async fn next(&mut self) -> Result<LinkInfo> {
135-
let speed = StreamExt::next(&mut self.speed).fuse();
136-
let carrier = StreamExt::next(&mut self.carrier).fuse();
137-
138-
pin_mut!(speed, carrier);
139-
select! {
140-
speed2 = speed => {
141-
if let Some(s) = speed2 {
142-
let s = s.get().await?;
143-
trace!("update speed: {} {:?}", self.interface, s);
144-
self.data.speed = s;
145-
}
146-
},
147-
carrier2 = carrier => {
148-
if let Some(c) = carrier2 {
149-
let c = c.get().await?;
150-
trace!("update carrier: {} {:?}", self.interface, c);
151-
self.data.carrier = c;
152-
}
153-
},
154-
};
155-
Ok(self.data.clone())
156-
}
157-
}
158-
15964
#[cfg(not(feature = "demo_mode"))]
16065
async fn get_device_path(conn: &Arc<Connection>, interface_name: &str) -> OwnedObjectPath {
16166
let manager = loop {
@@ -181,6 +86,65 @@ async fn get_device_path(conn: &Arc<Connection>, interface_name: &str) -> OwnedO
18186
}
18287
}
18388

89+
#[cfg(not(feature = "demo_mode"))]
90+
async fn handle_link_updates(
91+
conn: &Arc<Connection>,
92+
topic: Arc<Topic<LinkInfo>>,
93+
interface_name: &str,
94+
led: Arc<Topic<BlinkPattern>>,
95+
) -> Result<()> {
96+
let device_path = get_device_path(conn, interface_name).await;
97+
let device = WiredProxy::builder(conn).path(device_path)?.build().await?;
98+
99+
let mut carrier_changes = device.receive_carrier_changed().await;
100+
let mut speed_changes = device.receive_speed_changed().await;
101+
102+
let mut info = LinkInfo {
103+
carrier: carrier_changes
104+
.next()
105+
.await
106+
.ok_or_else(|| anyhow!("Unexpected end of carrier subscription"))?
107+
.get()
108+
.await?,
109+
speed: speed_changes
110+
.next()
111+
.await
112+
.ok_or_else(|| anyhow!("Unexpected end of speed subscription"))?
113+
.get()
114+
.await?,
115+
};
116+
117+
loop {
118+
// The two color LED on the DUT interface is under the control of
119+
// the switch IC. For 100MBit/s and 1GBit/s it lights in distinct
120+
// colors, but for 10MBit/s it is just off.
121+
// Build the most round-about link speed indicator ever so that we
122+
// have speed indication for 10MBit/s.
123+
led.set({
124+
let led_brightness = if info.speed == 10 { 1.0 } else { 0.0 };
125+
126+
BlinkPattern::solid(led_brightness)
127+
});
128+
129+
topic.set(info.clone());
130+
131+
select! {
132+
carrier = carrier_changes.next().fuse() => {
133+
info.carrier = carrier
134+
.ok_or_else(|| anyhow!("Unexpected end of carrier subscription"))?
135+
.get()
136+
.await?;
137+
}
138+
speed = speed_changes.next().fuse() => {
139+
info.speed = speed
140+
.ok_or_else(|| anyhow!("Unexpected end of speed subscription"))?
141+
.get()
142+
.await?;
143+
}
144+
}
145+
}
146+
}
147+
184148
#[cfg(not(feature = "demo_mode"))]
185149
async fn handle_ipv4_updates(
186150
conn: &Arc<Connection>,
@@ -308,69 +272,29 @@ impl Network {
308272
) -> Self {
309273
let this = Self::setup_topics(bb);
310274

311-
{
312-
let conn = conn.clone();
313-
let dut_interface = this.dut_interface.clone();
314-
async_std::task::spawn(async move {
315-
let mut link_stream = loop {
316-
if let Ok(ls) = LinkStream::new(conn.clone(), "dut").await {
317-
break ls;
318-
}
319-
320-
sleep(Duration::from_secs(1)).await;
321-
};
322-
323-
dut_interface.set(link_stream.now());
324-
325-
while let Ok(info) = link_stream.next().await {
326-
// The two color LED on the DUT interface is under the control of
327-
// the switch IC. For 100MBit/s and 1GBit/s it lights in distinct
328-
// colors, but for 10MBit/s it is just off.
329-
// Build the most round-about link speed indicator ever so that we
330-
// have speed indication for 10MBit/s.
331-
let led_brightness = if info.speed == 10 { 1.0 } else { 0.0 };
332-
led_dut.set(BlinkPattern::solid(led_brightness));
333-
334-
dut_interface.set(info);
335-
}
336-
});
337-
}
338-
339-
{
340-
let conn = conn.clone();
341-
let uplink_interface = this.uplink_interface.clone();
342-
async_std::task::spawn(async move {
343-
let mut link_stream = loop {
344-
if let Ok(ls) = LinkStream::new(conn.clone(), "uplink").await {
345-
break ls;
346-
}
347-
348-
sleep(Duration::from_secs(1)).await;
349-
};
350-
351-
uplink_interface.set(link_stream.now());
352-
353-
while let Ok(info) = link_stream.next().await {
354-
// See the equivalent section on the uplink interface on why
355-
// this is here.
356-
let led_brightness = if info.speed == 10 { 1.0 } else { 0.0 };
357-
led_uplink.set(BlinkPattern::solid(led_brightness));
358-
359-
uplink_interface.set(info);
360-
}
361-
});
362-
}
275+
let conn_task = conn.clone();
276+
let dut_interface = this.dut_interface.clone();
277+
async_std::task::spawn(async move {
278+
handle_link_updates(&conn_task, dut_interface, "dut", led_dut)
279+
.await
280+
.unwrap();
281+
});
363282

364-
{
365-
let conn = conn.clone();
366-
let bridge_interface = this.bridge_interface.clone();
283+
let conn_task = conn.clone();
284+
let uplink_interface = this.uplink_interface.clone();
285+
async_std::task::spawn(async move {
286+
handle_link_updates(&conn_task, uplink_interface, "uplink", led_uplink)
287+
.await
288+
.unwrap();
289+
});
367290

368-
async_std::task::spawn(async move {
369-
handle_ipv4_updates(&conn, bridge_interface, "tac-bridge")
370-
.await
371-
.unwrap();
372-
});
373-
}
291+
let conn_task = conn.clone();
292+
let bridge_interface = this.bridge_interface.clone();
293+
async_std::task::spawn(async move {
294+
handle_ipv4_updates(&conn_task, bridge_interface, "tac-bridge")
295+
.await
296+
.unwrap();
297+
});
374298

375299
this
376300
}

0 commit comments

Comments
 (0)