Skip to content

Commit 3f8f700

Browse files
authored
extending shutdown to handle single agent setup in relayed mode (#19)
* extending shutdown to handle single agent setup in relayed mode * extending shutdown to handle single agent setup in relayed mode --------- Signed-off-by: ShoroukRamzy <82869891+ShoroukRamzy@users.noreply.github.com>
1 parent 46bf0f5 commit 3f8f700

File tree

4 files changed

+43
-23
lines changed

4 files changed

+43
-23
lines changed

examples/rust/mini-adas/src/config.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ pub fn agent_assignments() -> HashMap<AgentId, Vec<(WorkerId, Vec<ActivityIdAndB
133133
]
134134
.into_iter()
135135
.collect();
136-
137136
#[cfg(feature = "signalling_direct_mpsc")]
138137
let assignment = [(100.into(), vec![w40, w41, w42, w43, w44])]
139138
.into_iter()

feo/src/agent/relayed/primary.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl Primary {
184184
th.join().unwrap();
185185
}
186186
// Wait for the communication relay threads to complete their shutdown.
187-
for th in self.relay_threads.drain(..) {
187+
for th in core::mem::take(&mut self.relay_threads) {
188188
th.join().unwrap();
189189
}
190190
debug!("Primary finished!!");

feo/src/signalling/relayed/connectors/scheduler.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use std::thread::JoinHandle;
3030
pub(crate) struct SchedulerConnector<Inter: IsChannel, Intra: IsChannel> {
3131
local_workers: HashSet<WorkerId>,
3232
intra_receiver: Intra::MultiReceiver,
33-
ipc_receive_relay: PrimaryReceiveRelay<Inter, Intra>,
34-
ipc_send_relay: PrimarySendRelay<Inter>,
33+
ipc_receive_relay: Option<PrimaryReceiveRelay<Inter, Intra>>,
34+
ipc_send_relay: Option<PrimarySendRelay<Inter>>,
3535
worker_sender: Intra::MultiSender,
3636
timeout: Duration,
3737
worker_connector_builders: Option<HashMap<WorkerId, Builder<WorkerConnector>>>,
@@ -45,8 +45,8 @@ impl<Inter: IsChannel, Intra: IsChannel> SchedulerConnector<Inter, Intra> {
4545
pub fn with_fields(
4646
local_workers: HashSet<WorkerId>,
4747
intra_receiver: Intra::MultiReceiver,
48-
ipc_receive_relay: PrimaryReceiveRelay<Inter, Intra>,
49-
ipc_send_relay: PrimarySendRelay<Inter>,
48+
ipc_receive_relay: Option<PrimaryReceiveRelay<Inter, Intra>>,
49+
ipc_send_relay: Option<PrimarySendRelay<Inter>>,
5050
worker_sender: Intra::MultiSender,
5151
timeout: Duration,
5252
worker_connector_builders: Option<HashMap<WorkerId, Builder<WorkerConnector>>>,
@@ -72,7 +72,11 @@ impl<Inter: IsChannel, Intra: IsChannel> SchedulerConnector<Inter, Intra> {
7272
agent_id: AgentId,
7373
signal: Inter::ProtocolSignal,
7474
) -> Result<(), Error> {
75-
self.ipc_send_relay.send_to_agent(agent_id, signal)
75+
if let Some(relay) = self.ipc_send_relay.as_mut() {
76+
relay.send_to_agent(agent_id, signal)
77+
} else {
78+
Err(Error::ChannelNotFound(ChannelId::Agent(agent_id)))
79+
}
7680
}
7781

7882
// Relay signal onto inter-process connector
@@ -103,9 +107,13 @@ impl<Inter: IsChannel, Intra: IsChannel> SchedulerConnector<Inter, Intra> {
103107

104108
pub fn run_and_connect(&mut self) -> Result<(), Error> {
105109
debug!("Starting MixedSchedulerConnector");
106-
let receive_relay_handle = self.ipc_receive_relay.run_and_connect();
107-
self.relay_threads.push(receive_relay_handle);
108-
self.ipc_send_relay.connect()?;
110+
if let Some(relay) = self.ipc_receive_relay.as_mut() {
111+
let receive_relay_handle = relay.run_and_connect();
112+
self.relay_threads.push(receive_relay_handle);
113+
}
114+
if let Some(relay) = self.ipc_send_relay.as_mut() {
115+
relay.connect()?;
116+
}
109117
self.intra_receiver.connect_senders(self.timeout)?;
110118
self.worker_sender.connect_receivers(self.timeout)
111119
}
@@ -115,7 +123,11 @@ impl<Inter: IsChannel, Intra: IsChannel> SchedulerConnector<Inter, Intra> {
115123
}
116124

117125
fn sync_time(&mut self) -> Result<(), Error> {
118-
self.ipc_send_relay.sync_time()
126+
if let Some(relay) = self.ipc_send_relay.as_mut() {
127+
relay.sync_time()
128+
} else {
129+
Ok(())
130+
}
119131
}
120132
}
121133

@@ -131,7 +143,9 @@ impl<Inter: IsChannel, Intra: IsChannel> ConnectScheduler for SchedulerConnector
131143
fn get_connected_agent_ids(&self) -> Vec<AgentId> {
132144
let mut agent_ids: BTreeSet<_> = self.worker_agent_map.values().copied().collect();
133145
// In relayed mode, recorders are also agents we talk to.
134-
agent_ids.extend(self.ipc_send_relay.get_remote_agents());
146+
if let Some(relay) = self.ipc_send_relay.as_ref() {
147+
agent_ids.extend(relay.get_remote_agents());
148+
}
135149
agent_ids.into_iter().collect()
136150
}
137151

@@ -157,7 +171,9 @@ impl<Inter: IsChannel, Intra: IsChannel> ConnectScheduler for SchedulerConnector
157171

158172
fn broadcast_terminate(&mut self, signal: &Signal) -> Result<(), Error> {
159173
// Broadcast to remote agents via the IPC relay.
160-
self.ipc_send_relay.broadcast((*signal).into())?;
174+
if let Some(relay) = self.ipc_send_relay.as_mut() {
175+
relay.broadcast((*signal).into())?;
176+
}
161177

162178
// Also broadcast to local workers via the MPSC sender.
163179
self.worker_sender.broadcast((*signal).into())

feo/src/signalling/relayed/sockets_mpsc.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -217,22 +217,27 @@ impl<Inter: SocketChannel> scheduler::SchedulerConnector<Inter, IntraChannel> {
217217
intra_builders::multi_sender_builder(&local_worker_channels);
218218
let worker_sender = worker_sender_builder();
219219

220-
// Create an IPC relay for receiving from remote processes
221220
let channel_ids: Vec<ChannelId> = remote_agents
222221
.iter()
223222
.copied()
224223
.map(ChannelId::Agent)
225224
.collect();
226225

227-
let relay_sender_builder = local_sender_builders.remove(&relay_channel).unwrap();
228-
let relay_receiver_builder =
229-
Inter::multi_receiver_builder(channel_ids.clone(), bind_address_senders);
230-
let ipc_receive_relay =
231-
PrimaryReceiveRelay::new(relay_sender_builder, relay_receiver_builder, timeout);
232-
233-
let ipc_sender = Inter::new_multi_sender(&channel_ids, bind_address_receivers);
234-
235-
let ipc_send_relay = PrimarySendRelay::new(remote_agents, ipc_sender, timeout);
226+
// Create IPC relays only if there are remote agents to communicate with.
227+
let (ipc_receive_relay, ipc_send_relay) = if !remote_agents.is_empty() {
228+
let relay_sender_builder = local_sender_builders.remove(&relay_channel).unwrap();
229+
let relay_receiver_builder =
230+
Inter::multi_receiver_builder(channel_ids.clone(), bind_address_senders);
231+
let receive_relay =
232+
PrimaryReceiveRelay::new(relay_sender_builder, relay_receiver_builder, timeout);
233+
234+
let ipc_sender = Inter::new_multi_sender(&channel_ids, bind_address_receivers);
235+
let send_relay = PrimarySendRelay::new(remote_agents, ipc_sender, timeout);
236+
237+
(Some(receive_relay), Some(send_relay))
238+
} else {
239+
(None, None)
240+
};
236241

237242
// Create worker connector builders for local workers
238243
let worker_connector_builders: HashMap<WorkerId, Builder<WorkerConnector>> =

0 commit comments

Comments
 (0)