Skip to content

Commit 628df66

Browse files
handling startup timeout (#21)
* handling startup timeout * allow Scheduler::new function to take more than element
1 parent ddf6d9f commit 628df66

File tree

7 files changed

+61
-10
lines changed

7 files changed

+61
-10
lines changed

Cargo.lock

Lines changed: 15 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/rust/cycle-benchmark/src/bin/cycle_bench.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ mod direct_mpsc {
195195
recorder_ids: vec![],
196196
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
197197
timeout: Duration::from_secs(10),
198+
startup_timeout: Duration::from_secs(10),
198199
}
199200
}
200201

@@ -245,6 +246,7 @@ mod direct_sockets {
245246
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
246247
timeout: Duration::from_secs(10),
247248
connection_timeout: Duration::from_secs(10),
249+
startup_timeout: Duration::from_secs(10),
248250
endpoint: endpoint(&app_config, signalling),
249251
activity_agent_map: app_config
250252
.activity_worker_map()
@@ -337,6 +339,7 @@ mod relayed_sockets {
337339
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
338340
timeout: Duration::from_secs(10),
339341
connection_timeout: Duration::from_secs(10),
342+
startup_timeout: Duration::from_secs(10),
340343
bind_address_senders: endpoints.0,
341344
bind_address_receivers: endpoints.1,
342345
id: agent_id,

examples/rust/mini-adas/src/bin/adas_primary.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ mod cfg {
106106
recorder_ids: vec![],
107107
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
108108
timeout: Duration::from_secs(10),
109+
startup_timeout: Duration::from_secs(10),
109110
}
110111
}
111112
}
@@ -144,6 +145,7 @@ mod cfg {
144145
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
145146
timeout: Duration::from_secs(10),
146147
connection_timeout: Duration::from_secs(10),
148+
startup_timeout: Duration::from_secs(10),
147149
endpoint: NodeAddress::Tcp(BIND_ADDR),
148150
activity_agent_map: activity_worker_map
149151
.iter()
@@ -190,6 +192,7 @@ mod cfg {
190192
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
191193
timeout: Duration::from_secs(10),
192194
connection_timeout: Duration::from_secs(10),
195+
startup_timeout: Duration::from_secs(10),
193196
endpoint: NodeAddress::UnixSocket(socket_paths().0),
194197
activity_agent_map: activity_worker_map
195198
.iter()
@@ -233,6 +236,7 @@ mod cfg {
233236
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
234237
timeout: Duration::from_secs(10),
235238
connection_timeout: Duration::from_secs(10),
239+
startup_timeout: Duration::from_secs(10),
236240
bind_address_senders: NodeAddress::Tcp(BIND_ADDR),
237241
bind_address_receivers: NodeAddress::Tcp(BIND_ADDR2),
238242
id: AGENT_ID,
@@ -273,6 +277,7 @@ mod cfg {
273277
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
274278
timeout: Duration::from_secs(10),
275279
connection_timeout: Duration::from_secs(10),
280+
startup_timeout: Duration::from_secs(10),
276281
bind_address_senders: NodeAddress::UnixSocket(socket_paths().0),
277282
bind_address_receivers: NodeAddress::UnixSocket(socket_paths().1),
278283
id: AGENT_ID,

feo/src/agent/direct/primary.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub struct PrimaryConfig {
4848
pub timeout: Duration,
4949
/// Timeout for waiting on initial connections from workers/recorders.
5050
pub connection_timeout: Duration,
51+
/// Timeout for waiting on activities to become ready during startup.
52+
pub startup_timeout: Duration,
5153
/// Endpoint on which the connector of the scheduler waits for connections
5254
pub endpoint: NodeAddress,
5355
/// Map of all activities to agent ids
@@ -72,6 +74,7 @@ impl Primary {
7274
endpoint,
7375
timeout,
7476
connection_timeout,
77+
startup_timeout,
7578
activity_agent_map,
7679
..
7780
} = config;
@@ -145,6 +148,7 @@ impl Primary {
145148
config.id,
146149
cycle_time,
147150
timeout,
151+
startup_timeout,
148152
activity_dependencies,
149153
connector,
150154
recorder_ids,

feo/src/agent/direct/primary_mpsc.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub struct PrimaryConfig {
4444
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
4545
/// Receive timeout of the scheduler's connector
4646
pub timeout: Duration,
47+
/// Timeout for waiting on activities to become ready during startup.
48+
pub startup_timeout: Duration,
4749
}
4850

4951
/// Primary agent
@@ -62,6 +64,7 @@ impl Primary {
6264
activity_dependencies,
6365
recorder_ids,
6466
timeout,
67+
startup_timeout,
6568
..
6669
} = config;
6770

@@ -112,6 +115,7 @@ impl Primary {
112115
config.id,
113116
cycle_time,
114117
timeout,
118+
startup_timeout,
115119
activity_dependencies,
116120
connector,
117121
recorder_ids,

feo/src/agent/relayed/primary.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub struct PrimaryConfig {
4747
pub timeout: Duration,
4848
/// Timeout for waiting on initial connections from workers/recorders.
4949
pub connection_timeout: Duration,
50+
/// Timeout for waiting on activities to become ready during startup.
51+
pub startup_timeout: Duration,
5052
/// The socket address to which secondary agents' senders shall connect
5153
pub bind_address_senders: NodeAddress,
5254
/// The socket address to which secondary agents' receivers shall connect
@@ -80,6 +82,7 @@ impl Primary {
8082
worker_assignments,
8183
timeout,
8284
connection_timeout,
85+
startup_timeout,
8386
worker_agent_map,
8487
activity_worker_map,
8588
} = config;
@@ -152,6 +155,7 @@ impl Primary {
152155
id,
153156
cycle_time,
154157
timeout,
158+
startup_timeout,
155159
activity_dependencies,
156160
connector,
157161
recorder_ids,

feo/src/scheduler.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ pub(crate) struct Scheduler {
3737
cycle_time: feo_time::Duration,
3838
/// Timeout of receive function
3939
receive_timeout: core::time::Duration,
40-
40+
/// Timeout for waiting on activities to become ready during startup.
41+
startup_timeout: core::time::Duration,
4142
/// For each activity: list of activities it depends on
4243
activity_depends: HashMap<ActivityId, Vec<ActivityId>>,
4344
/// Map keeping track of activity states
@@ -55,10 +56,12 @@ pub(crate) struct Scheduler {
5556
}
5657

5758
impl Scheduler {
59+
#[allow(clippy::too_many_arguments)]
5860
pub(crate) fn new(
5961
agent_id: AgentId,
6062
feo_cycle_time: feo_time::Duration,
6163
receive_timeout: core::time::Duration,
64+
startup_timeout: core::time::Duration,
6265
activity_depends: HashMap<ActivityId, Vec<ActivityId>>,
6366
connector: Box<dyn ConnectScheduler>,
6467
recorder_ids: Vec<AgentId>,
@@ -84,6 +87,7 @@ impl Scheduler {
8487
agent_id,
8588
cycle_time: feo_cycle_time,
8689
receive_timeout,
90+
startup_timeout,
8791
activity_depends,
8892
connector,
8993
activity_states,
@@ -117,10 +121,28 @@ impl Scheduler {
117121
Self::startup_activity(activity_id, &self.recorder_ids, &mut self.connector).unwrap();
118122
}
119123

120-
// Wait until all activities have returned their ready signal
124+
// Wait until all activities have returned their ready signal, with a timeout.
125+
let startup_start = Instant::now();
121126
while !self.all_ready() {
122-
self.wait_next_ready()
123-
.expect("failed while waiting for ready signal");
127+
if startup_start.elapsed() > self.startup_timeout {
128+
let reason = alloc::format!(
129+
"Startup timeout of {:?} exceeded. Not all activities became ready.",
130+
self.startup_timeout
131+
);
132+
error!("{}", reason);
133+
self.shutdown_gracefully(&reason);
134+
return;
135+
}
136+
if self.wait_next_ready().is_err() {
137+
// An error here (like a timeout on receive) can also be a startup failure.
138+
let reason = alloc::format!(
139+
"Failed to receive ready signal from all activities within startup timeout {:?}.",
140+
self.startup_timeout
141+
);
142+
error!("{}", reason);
143+
self.shutdown_gracefully(&reason);
144+
return;
145+
}
124146
}
125147

126148
// Loop the FEO task chain

0 commit comments

Comments
 (0)