Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,7 @@ impl Container {
already_cancelled = true;

let msg = CoordinatorMessage::Kill;
trace!("processing {msg:?}");
trace!(msg_name = msg.as_ref(), "processing");
to_worker_tx.send(msg).await.context(KillSnafu)?;
},

Expand All @@ -1847,12 +1847,12 @@ impl Container {
}
};

trace!("processing {msg:?}");
trace!(msg_name = msg.as_ref(), "processing");
to_worker_tx.send(msg).await.context(StdinSnafu)?;
},

Some(container_msg) = from_worker_rx.recv() => {
trace!("processing {container_msg:?}");
trace!(msg_name = container_msg.as_ref(), "processing");

match container_msg {
WorkerMessage::ExecuteCommand(resp) => {
Expand Down Expand Up @@ -2358,48 +2358,63 @@ impl Commander {
gc_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
select! {
command = command_rx.recv() => {
let Some((ack_tx, command)) = command else { break };
enum Event {
Command(Option<(oneshot::Sender<()>, DemultiplexCommand)>),

FromWorker(Option<Multiplexed<WorkerMessage>>),

// Find any channels where the receivers have been
// dropped and clear out the sending halves.
Gc,
}
use Event::*;

let event = select! {
command = command_rx.recv() => Command(command),

msg = from_worker_rx.recv() => FromWorker(msg),

_ = gc_interval.tick() => Gc,
};

match event {
Command(None) => break,
Command(Some((ack_tx, command))) => {
match command {
DemultiplexCommand::Listen(job_id, waiter) => {
trace!("adding listener for {job_id:?}");
trace!(job_id, "adding listener (many)");
let old = waiting.insert(job_id, waiter);
ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id });
}

DemultiplexCommand::ListenOnce(job_id, waiter) => {
trace!("adding listener for {job_id:?}");
trace!(job_id, "adding listener (once)");
let old = waiting_once.insert(job_id, waiter);
ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id });
}
}

ack_tx.send(()).ok(/* Don't care about it */);
},

msg = from_worker_rx.recv() => {
let Some(Multiplexed(job_id, msg)) = msg else { break };
}

FromWorker(None) => break,
FromWorker(Some(Multiplexed(job_id, msg))) => {
if let Some(waiter) = waiting_once.remove(&job_id) {
trace!("notifying listener for {job_id:?}");
trace!(job_id, "notifying listener (once)");
waiter.send(msg).ok(/* Don't care about it */);
continue;
}

if let Some(waiter) = waiting.get(&job_id) {
trace!("notifying listener for {job_id:?}");
trace!(job_id, "notifying listener (many)");
waiter.send(msg).await.ok(/* Don't care about it */);
continue;
}

warn!("no listener for {job_id:?}");
warn!(job_id, "no listener to notify");
}

// Find any channels where the receivers have been
// dropped and clear out the sending halves.
_ = gc_interval.tick() => {
Gc => {
waiting = mem::take(&mut waiting)
.into_iter()
.filter(|(_job_id, tx)| !tx.is_closed())
Expand Down
2 changes: 1 addition & 1 deletion compiler/base/orchestrator/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ macro_rules! impl_narrow_to_broad {
#[derive(Debug, Serialize, Deserialize)]
pub struct Multiplexed<T>(pub JobId, pub T);

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, strum_macros::AsRefStr)]
pub enum CoordinatorMessage {
WriteFile(WriteFileRequest),
DeleteFile(DeleteFileRequest),
Expand Down
1 change: 1 addition & 0 deletions tests/spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

capture_js_log = ENV.fetch('CAPTURE_JS_LOG', 'false').casecmp?('true')
Selenium::WebDriver.logger.level = :debug if capture_js_log
Selenium::WebDriver.logger.ignore(:clear_local_storage, :clear_session_storage)

browser_options = ::Selenium::WebDriver::Firefox::Options.new
browser_options.add_argument('-headless') if ENV.fetch('HEADLESS', 'true').casecmp?('true')
Expand Down
6 changes: 0 additions & 6 deletions ui/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ pub(crate) enum Endpoint {
MacroExpansion,
MetaCrates,
MetaVersions,
MetaVersionStable,
MetaVersionBeta,
MetaVersionNightly,
MetaVersionRustfmt,
MetaVersionClippy,
MetaVersionMiri,
Evaluate,
}

Expand Down
Loading