@@ -42,13 +42,12 @@ mod optional_includes {
42
42
pub ( super ) use async_std:: stream:: StreamExt ;
43
43
pub ( super ) use async_std:: task:: sleep;
44
44
pub ( super ) use futures:: { future:: FutureExt , pin_mut, select} ;
45
- pub ( super ) use log:: trace;
46
- pub ( super ) use std:: convert:: TryInto ;
45
+ pub ( super ) use log:: { info, trace} ;
47
46
pub ( super ) use std:: time:: Duration ;
48
47
pub ( super ) use zbus:: { Connection , PropertyStream } ;
49
- pub ( super ) use zvariant:: { ObjectPath , OwnedObjectPath } ;
48
+ pub ( super ) use zvariant:: OwnedObjectPath ;
50
49
51
- pub ( super ) use super :: devices:: { DeviceProxy , WiredProxy } ;
50
+ pub ( super ) use super :: devices:: { DeviceProxy , WiredProxy , NM_DEVICE_STATE_ACTIVATED } ;
52
51
pub ( super ) use super :: ipv4_config:: IP4ConfigProxy ;
53
52
pub ( super ) use super :: manager:: NetworkManagerProxy ;
54
53
}
@@ -92,25 +91,6 @@ async fn get_link_info(con: &Connection, path: &str) -> Result<LinkInfo> {
92
91
Ok ( info)
93
92
}
94
93
95
- #[ cfg( not( feature = "demo_mode" ) ) ]
96
- pub async fn get_ip4_address < ' a , P > ( con : & Connection , path : P ) -> Result < Vec < String > >
97
- where
98
- P : TryInto < ObjectPath < ' a > > ,
99
- P :: Error : Into < zbus:: Error > ,
100
- {
101
- let ip_4_proxy = IP4ConfigProxy :: builder ( con) . path ( path) ?. build ( ) . await ?;
102
-
103
- let ip_address = ip_4_proxy. address_data ( ) . await ?;
104
- trace ! ( "get IPv4: {:?}" , ip_address) ;
105
- let ip_address = ip_address
106
- . get ( 0 )
107
- . and_then ( |e| e. get ( "address" ) )
108
- . and_then ( |e| e. downcast_ref :: < zvariant:: Str > ( ) )
109
- . map ( |e| e. as_str ( ) )
110
- . ok_or ( anyhow ! ( "IP not found" ) ) ?;
111
- Ok ( Vec :: from ( [ ip_address. to_string ( ) ] ) )
112
- }
113
-
114
94
#[ cfg( not( feature = "demo_mode" ) ) ]
115
95
pub struct LinkStream < ' a > {
116
96
pub interface : String ,
@@ -177,62 +157,108 @@ impl<'a> LinkStream<'a> {
177
157
}
178
158
179
159
#[ cfg( not( feature = "demo_mode" ) ) ]
180
- pub struct IpStream < ' a > {
181
- pub interface : String ,
182
- _con : Arc < Connection > ,
183
- ip_4_config : PropertyStream < ' a , OwnedObjectPath > ,
184
- path : String ,
185
- }
186
-
187
- #[ cfg( not( feature = "demo_mode" ) ) ]
188
- impl < ' a > IpStream < ' a > {
189
- pub async fn new ( con : Arc < Connection > , interface : & str ) -> Result < IpStream < ' a > > {
190
- let path = path_from_interface ( & con, interface)
191
- . await ?
192
- . as_str ( )
193
- . to_string ( ) ;
160
+ async fn get_device_path ( conn : & Arc < Connection > , interface_name : & str ) -> OwnedObjectPath {
161
+ let manager = loop {
162
+ match NetworkManagerProxy :: new ( conn) . await {
163
+ Ok ( m) => break m,
164
+ Err ( _e) => {
165
+ info ! ( "Failed to connect to NetworkManager via DBus. Retry in 1s" ) ;
166
+ }
167
+ }
194
168
195
- let device_proxy = DeviceProxy :: builder ( & con)
196
- . path ( path. clone ( ) ) ?
197
- . build ( )
198
- . await ?;
169
+ sleep ( Duration :: from_secs ( 1 ) ) . await ;
170
+ } ;
199
171
200
- let ip_4_config = device_proxy. receive_ip4_config_changed ( ) . await ;
172
+ loop {
173
+ match manager. get_device_by_ip_iface ( interface_name) . await {
174
+ Ok ( d) => break d,
175
+ Err ( _e) => {
176
+ info ! ( "Failed to get interface {interface_name} from NetworkManager. Retry in 1s." )
177
+ }
178
+ }
201
179
202
- Ok ( Self {
203
- interface : interface. to_string ( ) ,
204
- _con : con,
205
- ip_4_config,
206
- path : path. to_string ( ) ,
207
- } )
180
+ sleep ( Duration :: from_secs ( 1 ) ) . await ;
208
181
}
182
+ }
183
+
184
+ #[ cfg( not( feature = "demo_mode" ) ) ]
185
+ async fn handle_ipv4_updates (
186
+ conn : & Arc < Connection > ,
187
+ topic : Arc < Topic < Vec < String > > > ,
188
+ interface_name : & str ,
189
+ ) -> Result < ( ) > {
190
+ let device_path = get_device_path ( conn, interface_name) . await ;
191
+ let device = DeviceProxy :: builder ( conn)
192
+ . path ( device_path) ?
193
+ . build ( )
194
+ . await ?;
195
+
196
+ let mut state_changes = device. receive_state_property_changed ( ) . await ;
197
+
198
+ loop {
199
+ // The NetworkManager DBus documentation says the Ip4Config property is
200
+ // "Only valid when the device is in the NM_DEVICE_STATE_ACTIVATED state".
201
+ // Loop until that is the case.
202
+ ' wait_activated: loop {
203
+ let state = state_changes
204
+ . next ( )
205
+ . await
206
+ . ok_or_else ( || anyhow ! ( "Unexpected end of state change subscription" ) ) ?
207
+ . get ( )
208
+ . await ?;
209
+
210
+ trace ! ( "Interface {interface_name} changed state to {state}" ) ;
211
+
212
+ if state == NM_DEVICE_STATE_ACTIVATED {
213
+ break ' wait_activated;
214
+ }
215
+ }
209
216
210
- pub async fn now ( & mut self , con : & Connection ) -> Result < Vec < String > > {
211
- let device_proxy = DeviceProxy :: builder ( con )
212
- . path ( self . path . as_str ( ) ) ?
217
+ let ip4_config_path = device . ip4_config ( ) . await ? ;
218
+ let ip4_config = IP4ConfigProxy :: builder ( conn )
219
+ . path ( ip4_config_path ) ?
213
220
. build ( )
214
221
. await ?;
215
222
216
- let ip_4_config = device_proxy . ip4_config ( ) . await ? ;
223
+ let mut address_data_changes = ip4_config. receive_address_data_changed ( ) . await ;
217
224
218
- Ok ( get_ip4_address ( con, ip_4_config)
219
- . await
220
- . unwrap_or_else ( |_e| Vec :: new ( ) ) )
221
- }
225
+ ' wait_deactivated: loop {
226
+ select ! {
227
+ new_state = state_changes. next( ) . fuse( ) => {
228
+ let state = new_state
229
+ . ok_or_else( || anyhow!( "Unexpected end of state change subscription" ) ) ?
230
+ . get( )
231
+ . await ?;
222
232
223
- pub async fn next ( & mut self , con : & Connection ) -> Result < Vec < String > > {
224
- let ip_4_config = StreamExt :: next ( & mut self . ip_4_config ) . await ;
233
+ trace!( "Interface {interface_name} changed state to {state}" ) ;
225
234
226
- if let Some ( path) = ip_4_config {
227
- let path = path. get ( ) . await ?;
228
- if let Ok ( ips) = get_ip4_address ( con, & path) . await {
229
- trace ! ( "updata ip: {} {:?}" , self . interface, ips) ;
230
- return Ok ( ips) ;
231
- } else {
232
- return Ok ( Vec :: new ( ) ) ;
235
+ topic. set( Vec :: new( ) ) ;
236
+
237
+ if state != NM_DEVICE_STATE_ACTIVATED {
238
+ break ' wait_deactivated;
239
+ }
240
+ }
241
+ address_data = address_data_changes. next( ) . fuse( ) => {
242
+ let address_data = address_data
243
+ . ok_or_else( || anyhow!( "Unexpected end of address data update stream" ) ) ?
244
+ . get( )
245
+ . await ?;
246
+
247
+ let addresses: Vec <String > = address_data
248
+ . iter( )
249
+ . filter_map( |a| {
250
+ a. get( "address" )
251
+ . and_then( |e| e. downcast_ref:: <zvariant:: Str >( ) )
252
+ . map( |e| e. as_str( ) . to_owned( ) )
253
+ } )
254
+ . collect( ) ;
255
+
256
+ trace!( "Interface {interface_name} got new IP addresses: {addresses:?}" ) ;
257
+
258
+ topic. set( addresses) ;
259
+ }
233
260
}
234
261
}
235
- Err ( anyhow ! ( "No IP found" ) )
236
262
}
237
263
}
238
264
@@ -245,7 +271,7 @@ pub struct Network {
245
271
impl Network {
246
272
fn setup_topics ( bb : & mut BrokerBuilder ) -> Self {
247
273
Self {
248
- bridge_interface : bb. topic_ro ( "/v1/tac/network/interface/tac-bridge" , None ) ,
274
+ bridge_interface : bb. topic_ro ( "/v1/tac/network/interface/tac-bridge" , Some ( Vec :: new ( ) ) ) ,
249
275
dut_interface : bb. topic_ro ( "/v1/tac/network/interface/dut" , None ) ,
250
276
uplink_interface : bb. topic_ro ( "/v1/tac/network/interface/uplink" , None ) ,
251
277
}
@@ -338,20 +364,11 @@ impl Network {
338
364
{
339
365
let conn = conn. clone ( ) ;
340
366
let bridge_interface = this. bridge_interface . clone ( ) ;
341
- async_std:: task:: spawn ( async move {
342
- let mut ip_stream = loop {
343
- if let Ok ( ips) = IpStream :: new ( conn. clone ( ) , "tac-bridge" ) . await {
344
- break ips;
345
- }
346
-
347
- sleep ( Duration :: from_secs ( 1 ) ) . await ;
348
- } ;
349
-
350
- bridge_interface. set ( ip_stream. now ( & conn) . await . unwrap ( ) ) ;
351
367
352
- while let Ok ( info) = ip_stream. next ( & conn) . await {
353
- bridge_interface. set ( info) ;
354
- }
368
+ async_std:: task:: spawn ( async move {
369
+ handle_ipv4_updates ( & conn, bridge_interface, "tac-bridge" )
370
+ . await
371
+ . unwrap ( ) ;
355
372
} ) ;
356
373
}
357
374
0 commit comments