Skip to content

Commit 877a3cb

Browse files
committed
Fix: remove remaining function-local state in favor of CoordinatorState
1 parent cd63115 commit 877a3cb

File tree

2 files changed

+23
-25
lines changed

2 files changed

+23
-25
lines changed

binaries/coordinator/src/lib.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use dora_core::{
1616
uhlc::{self, HLC},
1717
};
1818
use dora_message::{
19-
BuildId, DataflowId, SessionId,
19+
BuildId, SessionId,
2020
cli_to_coordinator::{BuildRequest, ControlRequest},
2121
common::DaemonId,
2222
coordinator_to_cli::{ControlRequestReply, DataflowResult, LogLevel, LogMessage},
@@ -33,7 +33,7 @@ use itertools::Itertools;
3333
use log_subscriber::LogSubscriber;
3434
use run::SpawnedDataflow;
3535
use std::{
36-
collections::{BTreeMap, BTreeSet, HashMap},
36+
collections::{BTreeMap, BTreeSet},
3737
net::SocketAddr,
3838
path::PathBuf,
3939
sync::Arc,
@@ -177,7 +177,7 @@ impl DaemonConnections {
177177
self.daemons.iter().map(|r| r.key().clone())
178178
}
179179

180-
fn iter_mut(&self) -> impl Iterator<Item = RefMutMulti<DaemonId, DaemonConnection>> {
180+
fn iter_mut(&self) -> impl Iterator<Item = RefMutMulti<'_, DaemonId, DaemonConnection>> {
181181
self.daemons.iter_mut()
182182
}
183183

@@ -214,11 +214,6 @@ async fn start_inner(
214214

215215
let mut events = (abortable_events, daemon_events).merge();
216216

217-
let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
218-
let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();
219-
220-
let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
221-
222217
// Shared state for ControlServer – populated with references to the
223218
// same data the event loop owns. This is a stepping-stone; a future
224219
// refactor will move all local state into CoordinatorState directly.
@@ -231,7 +226,7 @@ async fn start_inner(
231226
archived_dataflows: Default::default(),
232227
daemon_connections: Default::default(),
233228
daemon_events_tx,
234-
abort_handle: abort_handle.clone(),
229+
abort_handle,
235230
});
236231

237232
while let Some(event) = events.next().await {
@@ -391,7 +386,8 @@ async fn start_inner(
391386

392387
if dataflow.daemons.is_empty() {
393388
// Archive finished dataflow
394-
archived_dataflows
389+
coordinator_state
390+
.archived_dataflows
395391
.entry(uuid)
396392
.or_insert_with(|| ArchivedDataflow::from(entry.get()));
397393
let mut finished_dataflow = entry.remove();
@@ -457,9 +453,13 @@ async fn start_inner(
457453
// inline. Everything else is delegated to ControlServer.
458454
match request {
459455
ControlRequest::WaitForBuild { build_id } => {
460-
if let Some(build) = running_builds.get_mut(&build_id) {
456+
if let Some(mut build) =
457+
coordinator_state.running_builds.get_mut(&build_id)
458+
{
461459
build.build_result.register(reply_sender);
462-
} else if let Some(result) = finished_builds.get_mut(&build_id) {
460+
} else if let Some(mut result) =
461+
coordinator_state.finished_builds.get_mut(&build_id)
462+
{
463463
result.register(reply_sender);
464464
} else {
465465
let _ =
@@ -595,7 +595,7 @@ async fn start_inner(
595595
level,
596596
connection,
597597
} => {
598-
if let Some(build) = running_builds.get_mut(&build_id) {
598+
if let Some(mut build) = coordinator_state.running_builds.get_mut(&build_id) {
599599
build
600600
.log_subscribers
601601
.push(LogSubscriber::new(level, connection));
@@ -670,7 +670,7 @@ async fn start_inner(
670670
}
671671
}
672672
} else if let Some(build_id) = &message.build_id {
673-
if let Some(build) = running_builds.get_mut(build_id) {
673+
if let Some(mut build) = coordinator_state.running_builds.get_mut(build_id) {
674674
if build.log_subscribers.is_empty() {
675675
// buffer log message until there are subscribers
676676
build.buffered_log_messages.push(message);
@@ -701,8 +701,8 @@ async fn start_inner(
701701
build_id,
702702
daemon_id,
703703
result,
704-
} => match running_builds.get_mut(&build_id) {
705-
Some(build) => {
704+
} => match coordinator_state.running_builds.get_mut(&build_id) {
705+
Some(mut build) => {
706706
build.pending_build_results.remove(&daemon_id);
707707
match result {
708708
Ok(()) => {}
@@ -712,7 +712,8 @@ async fn start_inner(
712712
};
713713
if build.pending_build_results.is_empty() {
714714
tracing::info!("dataflow build finished: `{build_id}`");
715-
let mut build = running_builds.remove(&build_id).unwrap();
715+
let (build_id, mut build) =
716+
coordinator_state.running_builds.remove(&build_id).unwrap();
716717
let result = if build.errors.is_empty() {
717718
Ok(())
718719
} else {
@@ -723,7 +724,9 @@ async fn start_inner(
723724
ControlRequestReply::DataflowBuildFinished { build_id, result },
724725
));
725726

726-
finished_builds.insert(build_id, build.build_result);
727+
coordinator_state
728+
.finished_builds
729+
.insert(build_id, build.build_result);
727730
}
728731
}
729732
None => {

binaries/coordinator/src/state.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
1-
use std::{
2-
collections::{BTreeMap, HashMap},
3-
sync::Arc,
4-
};
1+
use std::{collections::BTreeMap, sync::Arc};
52

63
use dashmap::DashMap;
74
use dora_core::uhlc::HLC;
85
use dora_message::{
9-
BuildId, DataflowId, common::DaemonId, coordinator_to_daemon::CoordinatorToDaemonClient,
10-
daemon_to_coordinator::DataflowDaemonResult,
6+
BuildId, DataflowId, common::DaemonId, daemon_to_coordinator::DataflowDaemonResult,
117
};
128
use tokio::sync::mpsc;
13-
use uuid::Uuid;
149

1510
use crate::{
1611
ArchivedDataflow, CachedResult, DaemonConnections, Event, RunningBuild, RunningDataflow,

0 commit comments

Comments
 (0)