Skip to content

Start adding tarpc service and server to coordinator#1345

Open
phil-opp wants to merge 40 commits intomainfrom
tarpc
Open

Start adding tarpc service and server to coordinator#1345
phil-opp wants to merge 40 commits intomainfrom
tarpc

Conversation

@phil-opp
Copy link
Collaborator

@phil-opp phil-opp commented Feb 10, 2026

Add shared CoordinatorState struct for keeping state. Use DashMap for concurrent maps.

Not all messages use the proper tarpc request handling yet. This will be fixed in a follow-up commit. Same goes for the manual (de)serialization.

Alternative to #1288

Implementation notes:

Message Format

  • The message format is defined by the new CliControl trait in the message crate (cli_to_coordinator)
  • The ControlRequestReply enum is no longer needed, so it is removed
  • Most of the ControlRequest variants are no longer needed as they are part of the CliControl trait now. I removed these variants and renamed the enum to LegacyControlRequest
  • Given that the message format is changed, this PR will require a minor version bump of the dora-message crate

Coordinator

  • The coordinator now provides a tarpc server at port DORA_COORDINATOR_PORT_RPC_DEFAULT
    • Most messages are handled by this server
  • The existing control server still exists at port DORA_COORDINATOR_PORT_CONTROL_DEFAULT
    • It's only used for LogSubscribe and BuildLogSubscribe messages, which are both streaming-based and cannot be represented well in tarpc
    • IMO, the proper solution to this would be using zenoh for these subscriptions. This way, we would no longer need to involve the coordinater
  • Instead of handling all requests in a single event loop, the coordinator now spawns a tokio task for each request and handles them in parallel
    • state is managed in a CoordinatorState struct, which uses DashMap fields that provide safe concurrent access
  • The impl CliControl for ControlServer uses the same code for the different requests as the previous event loop (with minor modifications)

CLI

  • The CLI's main function and all execute calls are now async
  • Instead of manual serialization/deserialization and message type specifications, we can now simply call a function on the CliControlClient
  • For (build) log subscriptions, the LegacyControlRequest is used

Note: I used AI to generate some of the code, but I reviewed all generated code manually. I also improved and simplified the generated code.

Add shared `CoordinatorState` struct for keeping state. Use `DashMap` for concurrent maps.

Not all messages use the proper tarpc request handling yet. This will be fixed in a follow-up commit. Same goes for the manual (de)serialization.
@phil-opp phil-opp changed the title Start adding tarpc service and server Start adding tarpc service and server to coordinator Feb 10, 2026
@phil-opp
Copy link
Collaborator Author

@claude Can you look into the failure of the CI/Examples test? It looks like the multiple daemons example is not finishing properly. It works locally for me though.

@claude
Copy link

claude bot commented Feb 11, 2026

Claude finished @phil-opp's task in 5m 14s —— View job


CI/Examples Test Failure - Fixed ✅

Summary: The multiple daemons example was failing in CI due to a panic at the FFI boundary when using blocking_send() on a thread with lingering Tokio runtime context.

Root Cause

The runtime-node was crashing with Signal(6) - SIGABRT due to "panic in a function that cannot unwind" at the FFI boundary. Here's the chain of events:

  1. In binaries/runtime/src/operator/shared_lib.rs:40-44, a current_thread Tokio runtime is created to download shared libraries
  2. Even after this runtime is dropped, Tokio's thread-local state persists on the thread
  3. When the operator callback tries to send output via events_tx.blocking_send() (line 155), Tokio detects it's being called from a thread with runtime context
  4. blocking_send() panics when called from within a Tokio runtime context
  5. This panic propagates through the FFI boundary (ArcDynFn1 callback), which is extern "C" and cannot unwind
  6. Rust aborts the process with SIGABRT

The Problem Code

// Creates current_thread runtime that leaves thread-local state
let rt = tokio::runtime::Builder::new_current_thread()
    .enable_all()
    .build()?;
rt.block_on(download_file(source, target_path))
// rt is dropped, but thread-local state remains!

// Later, this panics because Tokio detects runtime context
self.events_tx.blocking_send(event)  // <-- PANICS

The Fix

Changed all blocking_send() calls to try_send() in binaries/runtime/src/operator/shared_lib.rs. The try_send() method doesn't require blocking and works correctly regardless of Tokio runtime context.

Changes made:

  • Line 155: blocking_sendtry_send (in output closure called from FFI)
  • Line 79: blocking_sendtry_send (Finished event)
  • Line 82: blocking_sendtry_send (Error event)
  • Line 85: blocking_sendtry_send (Panic event)

Commit

Pushed fix in commit a4d3ec1. CI should now pass for the multiple daemons example.

--- | Branch: tarpc

@phil-opp phil-opp marked this pull request as ready for review February 11, 2026 17:05
@phil-opp

This comment was marked as outdated.

@claude

This comment was marked as outdated.

@phil-opp

This comment was marked as outdated.

@claude

This comment was marked as outdated.

@haixuanTao
Copy link
Collaborator

@sjfhsjfh @Mivik , could you guys take a look on this :)

Copy link
Collaborator

@haixuanTao haixuanTao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the solid refactor! The DashMap lock discipline is well done and the insert-before-spawn pattern is excellent. I tested the full CLI lifecycle (up/check/list/start/stop/logs/build/destroy) on macOS and everything works correctly — including log streaming via both dora start and dora run.

Left a few comments on things that caught my eye.

addr: IpAddr,
_control_port: u16,
) -> eyre::Result<CliControlClient> {
let rpc_port = DORA_COORDINATOR_PORT_RPC_DEFAULT;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _control_port parameter is accepted but silently ignored — the RPC port is always hardcoded to DORA_COORDINATOR_PORT_RPC_DEFAULT (6013). This means any user passing --coordinator-port will have it silently discarded for all tarpc operations.

Suggestion — either derive the RPC port from the control port:

pub(crate) async fn connect_to_coordinator_rpc(
    addr: IpAddr,
    control_port: u16,
) -> eyre::Result<CliControlClient> {
    // RPC port is control port + 1 by convention
    let rpc_port = if control_port == DORA_COORDINATOR_PORT_CONTROL_DEFAULT {
        DORA_COORDINATOR_PORT_RPC_DEFAULT
    } else {
        control_port + 1
    };
    // ...
}

Or add a dedicated --coordinator-rpc-port CLI argument to CoordinatorOptions so users can configure both ports independently.

tracing::info!("dataflow build finished: `{build_id}`");
let mut build = running_builds.remove(&build_id).unwrap();
let (build_id, mut build) =
coordinator_state.running_builds.remove(&build_id).unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .unwrap() can panic and crash the entire coordinator. While the preceding get_mut() succeeded, the RefMut guard was dropped when we entered this if block, so another concurrent path could theoretically remove the entry in between (the tarpc server spawns handlers concurrently).

Even if that's unlikely today, a panic here takes down all running dataflows. Suggestion:

let Some((build_id, mut build)) =
    coordinator_state.running_builds.remove(&build_id)
else {
    tracing::warn!("build {build_id} was removed while finalizing results");
    continue;
};

)
let client = connect_to_coordinator_rpc(self.coordinator_addr, self.coordinator_port)
.await
.map_err(|_| eyre!("Failed to connect to coordinator"))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map_err(|_| ...) discards the underlying connection error, making it hard to debug why a connection failed (refused? timeout? DNS?). Other commands like stop.rs and logs.rs use .wrap_err() which preserves the cause chain.

Suggestion:

let client = connect_to_coordinator_rpc(self.coordinator_addr, self.coordinator_port)
    .await
    .wrap_err("failed to connect to dora coordinator")?;

future
.await
.context("RPC transport error")?
.map_err(|e| eyre::eyre!(e))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues with error reporting here:

  1. Transport errors lose detail: .context("RPC transport error") wraps the error but the message is generic — when the coordinator is down you get "RPC transport error" without knowing if it was connection refused, timeout, or network unreachable.

  2. Application errors lack operation context: .map_err(|e| eyre::eyre\!(e)) converts the coordinator's error String into an eyre Report but doesn't say which operation failed. Callers that don't add .wrap_err() produce unhelpful messages.

Some callers do add context (e.g. stop.rs uses .wrap_err("...")), but others don't (e.g. list.rs:81rpc(client.get_node_info(...))).

One option is to accept an operation label:

pub(crate) async fn rpc<T, E: std::error::Error + Send + Sync + 'static>(
    future: impl Future<Output = Result<Result<T, String>, E>>,
    operation: &str,
) -> eyre::Result<T> {
    future
        .await
        .wrap_err_with(|| format\!("RPC transport error during {operation}"))?
        .map_err(|e| eyre::eyre\!("{operation} failed: {e}"))
}

Alternatively, just ensure every call site wraps with .wrap_err().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants