Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions examples/rust/cycle-benchmark/src/bin/cycle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ mod direct_mpsc {
recorder_ids: vec![],
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
}
}

Expand Down Expand Up @@ -245,6 +246,7 @@ mod direct_sockets {
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
endpoint: endpoint(&app_config, signalling),
activity_agent_map: app_config
.activity_worker_map()
Expand Down Expand Up @@ -337,6 +339,7 @@ mod relayed_sockets {
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
bind_address_senders: endpoints.0,
bind_address_receivers: endpoints.1,
id: agent_id,
Expand Down
5 changes: 5 additions & 0 deletions examples/rust/mini-adas/src/bin/adas_primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ mod cfg {
recorder_ids: vec![],
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
}
}
}
Expand Down Expand Up @@ -144,6 +145,7 @@ mod cfg {
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
endpoint: NodeAddress::Tcp(BIND_ADDR),
activity_agent_map: activity_worker_map
.iter()
Expand Down Expand Up @@ -190,6 +192,7 @@ mod cfg {
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
endpoint: NodeAddress::UnixSocket(socket_paths().0),
activity_agent_map: activity_worker_map
.iter()
Expand Down Expand Up @@ -233,6 +236,7 @@ mod cfg {
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
bind_address_senders: NodeAddress::Tcp(BIND_ADDR),
bind_address_receivers: NodeAddress::Tcp(BIND_ADDR2),
id: AGENT_ID,
Expand Down Expand Up @@ -273,6 +277,7 @@ mod cfg {
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
startup_timeout: Duration::from_secs(10),
bind_address_senders: NodeAddress::UnixSocket(socket_paths().0),
bind_address_receivers: NodeAddress::UnixSocket(socket_paths().1),
id: AGENT_ID,
Expand Down
4 changes: 4 additions & 0 deletions feo/src/agent/direct/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub struct PrimaryConfig {
pub timeout: Duration,
/// Timeout for waiting on initial connections from workers/recorders.
pub connection_timeout: Duration,
/// Timeout for waiting on activities to become ready during startup.
pub startup_timeout: Duration,
/// Endpoint on which the connector of the scheduler waits for connections
pub endpoint: NodeAddress,
/// Map of all activities to agent ids
Expand All @@ -72,6 +74,7 @@ impl Primary {
endpoint,
timeout,
connection_timeout,
startup_timeout,
activity_agent_map,
..
} = config;
Expand Down Expand Up @@ -145,6 +148,7 @@ impl Primary {
config.id,
cycle_time,
timeout,
startup_timeout,
activity_dependencies,
connector,
recorder_ids,
Expand Down
4 changes: 4 additions & 0 deletions feo/src/agent/direct/primary_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub struct PrimaryConfig {
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
/// Receive timeout of the scheduler's connector
pub timeout: Duration,
/// Timeout for waiting on activities to become ready during startup.
pub startup_timeout: Duration,
}

/// Primary agent
Expand All @@ -62,6 +64,7 @@ impl Primary {
activity_dependencies,
recorder_ids,
timeout,
startup_timeout,
..
} = config;

Expand Down Expand Up @@ -112,6 +115,7 @@ impl Primary {
config.id,
cycle_time,
timeout,
startup_timeout,
activity_dependencies,
connector,
recorder_ids,
Expand Down
4 changes: 4 additions & 0 deletions feo/src/agent/relayed/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct PrimaryConfig {
pub timeout: Duration,
/// Timeout for waiting on initial connections from workers/recorders.
pub connection_timeout: Duration,
/// Timeout for waiting on activities to become ready during startup.
pub startup_timeout: Duration,
/// The socket address to which secondary agents' senders shall connect
pub bind_address_senders: NodeAddress,
/// The socket address to which secondary agents' receivers shall connect
Expand Down Expand Up @@ -80,6 +82,7 @@ impl Primary {
worker_assignments,
timeout,
connection_timeout,
startup_timeout,
worker_agent_map,
activity_worker_map,
} = config;
Expand Down Expand Up @@ -152,6 +155,7 @@ impl Primary {
id,
cycle_time,
timeout,
startup_timeout,
activity_dependencies,
connector,
recorder_ids,
Expand Down
30 changes: 26 additions & 4 deletions feo/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ pub(crate) struct Scheduler {
cycle_time: feo_time::Duration,
/// Timeout of receive function
receive_timeout: core::time::Duration,

/// Timeout for waiting on activities to become ready during startup.
startup_timeout: core::time::Duration,
/// For each activity: list of activities it depends on
activity_depends: HashMap<ActivityId, Vec<ActivityId>>,
/// Map keeping track of activity states
Expand All @@ -55,10 +56,12 @@ pub(crate) struct Scheduler {
}

impl Scheduler {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
agent_id: AgentId,
feo_cycle_time: feo_time::Duration,
receive_timeout: core::time::Duration,
startup_timeout: core::time::Duration,
activity_depends: HashMap<ActivityId, Vec<ActivityId>>,
connector: Box<dyn ConnectScheduler>,
recorder_ids: Vec<AgentId>,
Expand All @@ -84,6 +87,7 @@ impl Scheduler {
agent_id,
cycle_time: feo_cycle_time,
receive_timeout,
startup_timeout,
activity_depends,
connector,
activity_states,
Expand Down Expand Up @@ -117,10 +121,28 @@ impl Scheduler {
Self::startup_activity(activity_id, &self.recorder_ids, &mut self.connector).unwrap();
}

// Wait until all activities have returned their ready signal
// Wait until all activities have returned their ready signal, with a timeout.
let startup_start = Instant::now();
while !self.all_ready() {
self.wait_next_ready()
.expect("failed while waiting for ready signal");
if startup_start.elapsed() > self.startup_timeout {
let reason = alloc::format!(
"Startup timeout of {:?} exceeded. Not all activities became ready.",
self.startup_timeout
);
error!("{}", reason);
self.shutdown_gracefully(&reason);
return;
}
if self.wait_next_ready().is_err() {
// An error here (like a timeout on receive) can also be a startup failure.
let reason = alloc::format!(
"Failed to receive ready signal from all activities within startup timeout {:?}.",
self.startup_timeout
);
error!("{}", reason);
self.shutdown_gracefully(&reason);
return;
}
}

// Loop the FEO task chain
Expand Down
Loading