Skip to content

Commit a1f4373

Browse files
Connection timeout (#11)
* Handling Connection timeout in direct mode * Handling Connection timeout in relayed mode * remove the implementaion of Send trait * refactor: use Instant::now() and make interface module pub(crate)
1 parent 0e87e37 commit a1f4373

File tree

9 files changed

+200
-136
lines changed

9 files changed

+200
-136
lines changed

examples/rust/cycle-benchmark/src/bin/cycle_bench.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,24 @@ fn run_as_primary(params: Params, app_config: ApplicationConfig) {
5151
match signalling {
5252
SignallingType::DirectMpsc => {
5353
let config = direct_mpsc::make_primary_config(params, app_config);
54-
direct_mpsc::Primary::new(config).run().unwrap();
54+
direct_mpsc::Primary::new(config)
55+
.expect("failed to create mpsc primary")
56+
.run()
57+
.unwrap();
5558
}
5659
signalling @ SignallingType::DirectTcp | signalling @ SignallingType::DirectUnix => {
5760
let config = direct_sockets::make_primary_config(params, app_config, signalling);
58-
direct_sockets::Primary::new(config).run().unwrap();
61+
direct_sockets::Primary::new(config)
62+
.expect("failed to create direct socket primary")
63+
.run()
64+
.unwrap();
5965
}
6066
signalling @ SignallingType::RelayedTcp | signalling @ SignallingType::RelayedUnix => {
6167
let config = relayed_sockets::make_primary_config(params, app_config, signalling);
62-
relayed_sockets::Primary::new(config).run().unwrap();
68+
relayed_sockets::Primary::new(config)
69+
.expect("failed to create relayed socket primary")
70+
.run()
71+
.unwrap();
6372
}
6473
}
6574
}
@@ -233,6 +242,7 @@ mod direct_sockets {
233242
recorder_ids: app_config.recorders(),
234243
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
235244
timeout: Duration::from_secs(10),
245+
connection_timeout: Duration::from_secs(10),
236246
endpoint: endpoint(&app_config, signalling),
237247
}
238248
}
@@ -314,6 +324,7 @@ mod relayed_sockets {
314324
recorder_ids: app_config.recorders(),
315325
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
316326
timeout: Duration::from_secs(10),
327+
connection_timeout: Duration::from_secs(10),
317328
bind_address_senders: endpoints.0,
318329
bind_address_receivers: endpoints.1,
319330
id: agent_id,

examples/rust/mini-adas/src/bin/adas_primary.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,14 @@ fn main() {
4242
MAX_ADDITIONAL_SUBSCRIBERS,
4343
);
4444

45-
// Setup primary
46-
let mut primary = cfg::Primary::new(config);
47-
48-
// Run primary
49-
primary.run().unwrap()
45+
// Setup and run primary
46+
cfg::Primary::new(config)
47+
.unwrap_or_else(|err| {
48+
feo_log::error!("Failed to initialize primary agent: {err:?}");
49+
std::process::exit(1);
50+
})
51+
.run()
52+
.unwrap();
5053
}
5154

5255
/// Parameters of the primary
@@ -102,6 +105,7 @@ mod cfg {
102105
recorder_ids: vec![],
103106
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
104107
timeout: Duration::from_secs(10),
108+
connection_timeout: Duration::from_secs(10),
105109
}
106110
}
107111
}
@@ -126,6 +130,7 @@ mod cfg {
126130
recorder_ids: params.recorder_ids,
127131
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
128132
timeout: Duration::from_secs(10),
133+
connection_timeout: Duration::from_secs(10),
129134
endpoint: NodeAddress::Tcp(BIND_ADDR),
130135
}
131136
}
@@ -151,6 +156,7 @@ mod cfg {
151156
recorder_ids: params.recorder_ids,
152157
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
153158
timeout: Duration::from_secs(10),
159+
connection_timeout: Duration::from_secs(10),
154160
endpoint: NodeAddress::UnixSocket(socket_paths().0),
155161
}
156162
}
@@ -186,6 +192,7 @@ mod cfg {
186192
recorder_ids: params.recorder_ids,
187193
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
188194
timeout: Duration::from_secs(10),
195+
connection_timeout: Duration::from_secs(10),
189196
bind_address_senders: NodeAddress::Tcp(BIND_ADDR),
190197
bind_address_receivers: NodeAddress::Tcp(BIND_ADDR2),
191198
id: AGENT_ID,
@@ -225,6 +232,7 @@ mod cfg {
225232
recorder_ids: params.recorder_ids,
226233
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
227234
timeout: Duration::from_secs(10),
235+
connection_timeout: Duration::from_secs(10),
228236
bind_address_senders: NodeAddress::UnixSocket(socket_paths().0),
229237
bind_address_receivers: NodeAddress::UnixSocket(socket_paths().1),
230238
id: AGENT_ID,

feo/src/agent/direct/primary.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub struct PrimaryConfig {
4141
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
4242
/// Receive timeout of the scheduler's connector
4343
pub timeout: Duration,
44+
/// Timeout for waiting on initial connections from workers/recorders.
45+
pub connection_timeout: Duration,
4446
/// Endpoint on which the connector of the scheduler waits for connections
4547
pub endpoint: NodeAddress,
4648
}
@@ -55,14 +57,15 @@ pub struct Primary {
5557

5658
impl Primary {
5759
/// Create a new instance
58-
pub fn new(config: PrimaryConfig) -> Self {
60+
pub fn new(config: PrimaryConfig) -> Result<Self, Error> {
5961
let PrimaryConfig {
6062
cycle_time,
6163
activity_dependencies,
6264
recorder_ids,
6365
endpoint,
6466
worker_assignments,
6567
timeout,
68+
connection_timeout,
6669
} = config;
6770

6871
// Create worker threads first so that the connector of the scheduler can connect
@@ -100,14 +103,16 @@ impl Primary {
100103
addr,
101104
activity_dependencies.keys().cloned(),
102105
recorder_ids.iter().cloned(),
106+
connection_timeout,
103107
)) as Box<dyn ConnectScheduler>,
104108
NodeAddress::UnixSocket(path) => Box::new(UnixSchedulerConnector::new(
105109
&path,
106110
activity_dependencies.keys().cloned(),
107111
recorder_ids.iter().cloned(),
112+
connection_timeout,
108113
)) as Box<dyn ConnectScheduler>,
109114
};
110-
connector.connect_remotes().expect("failed to connect");
115+
connector.connect_remotes()?;
111116

112117
let scheduler = Scheduler::new(
113118
cycle_time,
@@ -117,10 +122,10 @@ impl Primary {
117122
recorder_ids,
118123
);
119124

120-
Self {
125+
Ok(Self {
121126
scheduler,
122127
_worker_threads,
123-
}
128+
})
124129
}
125130

126131
/// Run the agent

feo/src/agent/direct/primary_mpsc.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub struct Primary {
5151

5252
impl Primary {
5353
/// Create a new instance
54-
pub fn new(config: PrimaryConfig) -> Self {
54+
pub fn new(config: PrimaryConfig) -> Result<Self, Error> {
5555
let PrimaryConfig {
5656
cycle_time,
5757
activity_dependencies,
@@ -89,7 +89,7 @@ impl Primary {
8989
})
9090
.collect();
9191

92-
connector.connect_remotes().expect("failed to connect");
92+
connector.connect_remotes()?;
9393

9494
let scheduler = Scheduler::new(
9595
cycle_time,
@@ -99,10 +99,10 @@ impl Primary {
9999
recorder_ids,
100100
);
101101

102-
Self {
102+
Ok(Self {
103103
scheduler,
104104
_worker_threads,
105-
}
105+
})
106106
}
107107

108108
/// Run the agent

feo/src/agent/relayed/primary.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ pub struct PrimaryConfig {
4242
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
4343
/// Receive timeout of the scheduler's connector
4444
pub timeout: Duration,
45+
/// Timeout for waiting on initial connections from workers/recorders.
46+
pub connection_timeout: Duration,
4547
/// The socket address to which secondary agents' senders shall connect
4648
pub bind_address_senders: NodeAddress,
4749
/// The socket address to which secondary agents' receivers shall connect
@@ -62,7 +64,7 @@ pub struct Primary {
6264

6365
impl Primary {
6466
/// Create a new instance
65-
pub fn new(config: PrimaryConfig) -> Self {
67+
pub fn new(config: PrimaryConfig) -> Result<Self, Error> {
6668
let PrimaryConfig {
6769
id,
6870
cycle_time,
@@ -72,6 +74,7 @@ impl Primary {
7274
bind_address_receivers,
7375
worker_assignments,
7476
timeout,
77+
connection_timeout,
7578
worker_agent_map,
7679
activity_worker_map,
7780
} = config;
@@ -84,7 +87,7 @@ impl Primary {
8487
id,
8588
bind_senders,
8689
bind_receivers,
87-
timeout,
90+
connection_timeout,
8891
worker_agent_map,
8992
activity_worker_map,
9093
recorder_ids.clone(),
@@ -97,7 +100,7 @@ impl Primary {
97100
id,
98101
bind_senders,
99102
bind_receivers,
100-
timeout,
103+
connection_timeout,
101104
worker_agent_map,
102105
activity_worker_map,
103106
recorder_ids.clone(),
@@ -126,7 +129,7 @@ impl Primary {
126129
})
127130
.collect();
128131

129-
connector.connect_remotes().expect("failed to connect");
132+
connector.connect_remotes()?;
130133

131134
let scheduler = Scheduler::new(
132135
cycle_time,
@@ -136,10 +139,10 @@ impl Primary {
136139
recorder_ids,
137140
);
138141

139-
Self {
142+
Ok(Self {
140143
scheduler,
141144
_worker_threads,
142-
}
145+
})
143146
}
144147

145148
/// Run the agent

feo/src/signalling/direct/scheduler.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use alloc::vec::Vec;
2222
use core::net::SocketAddr;
2323
use core::time::Duration;
2424
use feo_log::warn;
25+
use feo_time::Instant;
2526
use mio::net::{TcpListener, UnixListener};
2627
use mio::{Events, Token};
2728
use std::collections::{HashMap, HashSet};
@@ -46,6 +47,7 @@ where
4647

4748
all_activities: Vec<ActivityId>,
4849
all_recorders: Vec<AgentId>,
50+
connection_timeout: Duration,
4951
}
5052

5153
impl<L> SchedulerConnector<L>
@@ -57,6 +59,7 @@ where
5759
server: SocketServer<L>,
5860
activity_ids: impl IntoIterator<Item = ActivityId>,
5961
recorder_ids: impl IntoIterator<Item = AgentId>,
62+
connection_timeout: Duration,
6063
) -> Self {
6164
let events = Events::with_capacity(32);
6265

@@ -73,6 +76,7 @@ where
7376
recorder_id_token_map,
7477
all_activities,
7578
all_recorders,
79+
connection_timeout,
7680
}
7781
}
7882
}
@@ -83,9 +87,10 @@ impl TcpSchedulerConnector {
8387
bind_address: SocketAddr,
8488
activity_ids: impl IntoIterator<Item = ActivityId>,
8589
recorder_ids: impl IntoIterator<Item = AgentId>,
90+
connection_timeout: Duration,
8691
) -> Self {
8792
let tcp_server = TcpServer::new(bind_address);
88-
Self::new_with_server(tcp_server, activity_ids, recorder_ids)
93+
Self::new_with_server(tcp_server, activity_ids, recorder_ids, connection_timeout)
8994
}
9095
}
9196

@@ -95,9 +100,10 @@ impl UnixSchedulerConnector {
95100
path: &Path,
96101
activity_ids: impl IntoIterator<Item = ActivityId>,
97102
recorder_ids: impl IntoIterator<Item = AgentId>,
103+
connection_timeout: Duration,
98104
) -> Self {
99105
let unix_server = UnixServer::new(path);
100-
Self::new_with_server(unix_server, activity_ids, recorder_ids)
106+
Self::new_with_server(unix_server, activity_ids, recorder_ids, connection_timeout)
101107
}
102108
}
103109

@@ -109,11 +115,19 @@ where
109115
let mut missing_activities: HashSet<ActivityId> =
110116
self.all_activities.iter().cloned().collect();
111117
let mut missing_recorders: HashSet<AgentId> = self.all_recorders.iter().cloned().collect();
118+
let start_time = Instant::now();
112119

113120
while !missing_activities.is_empty() || !missing_recorders.is_empty() {
114-
if let Some((token, signal)) = self
115-
.server
116-
.receive(&mut self.events, Duration::from_secs(1))
121+
let elapsed = start_time.elapsed();
122+
if elapsed >= self.connection_timeout {
123+
return Err(Error::Io((
124+
std::io::ErrorKind::TimedOut.into(),
125+
"CONNECTION_TIMEOUT",
126+
)));
127+
}
128+
let remaining_timeout = self.connection_timeout.saturating_sub(elapsed);
129+
// Wait for a new connection, but no longer than the remaining overall timeout.
130+
if let Some((token, signal)) = self.server.receive(&mut self.events, remaining_timeout)
117131
{
118132
match signal {
119133
ProtocolSignal::ActivityHello(activity_id) => {

0 commit comments

Comments
 (0)