Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions lib/velo-transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bytes = { workspace = true }
dashmap = { workspace = true }
# derive_builder = { workspace = true }
parking_lot.workspace = true
# serde = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand All @@ -45,7 +45,7 @@ socket2 = "0.6"
tokio-util = { version = "0.7", features = ["codec"] }

[target.'cfg(target_os = "linux")'.dependencies]
nix = { version = "0.30", features = ["sched"] }
nix = { version = "0.30", features = ["sched", "net"] }

# optional dependencies
axum = { version = "0.8", optional = true }
Expand All @@ -66,6 +66,7 @@ tokio-stream = { version = "0.1", optional = true, features = ["sync"] }
tokio = { workspace = true, features = ["test-util", "macros"] }
tower = "0.5"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
paste = "1"

[build-dependencies]
tonic-build = "0.13.1"
4 changes: 3 additions & 1 deletion lib/velo-transports/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Only compile proto files if grpc feature is enabled
#[cfg(feature = "grpc")]
{
tonic_build::compile_protos("proto/velo.proto")?;
tonic_build::configure()
.bytes(["velo.streaming.v1.FramedData"])
.compile_protos(&["proto/velo.proto"], &["proto"])?;
}
Ok(())
}
128 changes: 128 additions & 0 deletions lib/velo-transports/docs/interface-discovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Interface Discovery & Endpoint Selection

## Problem

When a transport binds to `0.0.0.0:PORT`, the OS listens on all interfaces. But the address advertised to peers must be a routable IP, not `0.0.0.0`. On multi-NIC machines (wifi, eth0, ConnectX), we need to:

1. Discover which interfaces are available and advertise all of them
2. Let peers pick the best interface based on subnet affinity and NUMA topology

## Architecture

```
Builder Register (peer side)
│ │
▼ ▼
resolve_advertise_endpoints() parse_endpoints()
│ │
▼ ▼
discover_interfaces() select_best_endpoint()
│ (getifaddrs + sysfs) │ (NUMA + subnet matching)
▼ ▼
Vec<InterfaceEndpoint> SocketAddr
rmp_serde::to_vec() → WorkerAddress
```

The work is purely **advertisement** (what addresses to tell peers) and **selection** (which peer address to connect to). No per-interface listeners are needed.

## InterfaceEndpoint

Each discovered interface produces an `InterfaceEndpoint`:

```rust
pub struct InterfaceEndpoint {
pub name: String, // "eth0", "mlx5_0"
pub ip: String, // "192.168.1.10"
pub port: u16, // from the bound listener
pub prefix_len: u8, // CIDR prefix: 24 for /24
pub numa_node: Option<i32>, // from sysfs, None if unavailable
}
```

These are serialized as a msgpack array into the `WorkerAddress` entry for the transport key.

## Discovery

`discover_interfaces()` calls `nix::ifaddrs::getifaddrs()` and filters:

- **Included**: interfaces that are `IFF_UP` with IPv4 or IPv6 addresses
- **Excluded**: loopback (`IFF_LOOPBACK`, `127.0.0.1`, `::1`)
- **Excluded**: IPv6 link-local (`fe80::`) — scope_id routing complexity not worth it

For each interface, NUMA node is read from `/sys/class/net/<name>/device/numa_node`. Returns `None` if the file is missing (virtual interfaces, non-Linux) or contains `-1`.

Prefix length is computed from the netmask via `netmask_to_prefix_len()` — counting leading 1-bits.

## Interface Filtering

Users control which interfaces are advertised via `InterfaceFilter`:

```rust
pub enum InterfaceFilter {
All, // all UP, non-loopback (default)
ByName(String), // specific NIC, e.g. "mlx5_0"
Explicit(Vec<InterfaceEndpoint>), // manual override (testing)
}
```

Set on the builder:

```rust
TcpTransportBuilder::new()
.interface_filter(InterfaceFilter::ByName("mlx5_0".into()))
.numa_hint(1)
.build()?;
```

## Endpoint Selection

When a peer registers, `select_best_endpoint()` picks the best remote endpoint to connect to. Selection phases (first match wins):

| Phase | Criteria | Rationale |
|-------|----------|-----------|
| 1 | NUMA match + subnet match | Same NUMA node as GPU and on a reachable subnet |
| 2 | NUMA match only | Same NUMA node, even without subnet overlap |
| 3 | Subnet match | Any local interface shares a network with the remote |
| 4 | First non-loopback | Fallback to any routable address |
| 5 | First entry | Last resort |

The `numa_hint` is provided by the caller (typically from `dynamo_memory::numa::get_device_numa_node(gpu_id)`), keeping velo-transports CUDA-agnostic.

## Wire Format & Backward Compatibility

The `WorkerAddress` entry for a transport key contains either:

- **New format**: msgpack-encoded `Vec<InterfaceEndpoint>`
- **Legacy format**: UTF-8 string like `"tcp://127.0.0.1:5555"` or `"grpc://host:port"`

`parse_endpoints()` tries msgpack deserialization first, then falls back to legacy string parsing. This means new code can register peers advertising the old format and vice versa.

## Builder Pre-binding

The builder always pre-binds a `std::net::TcpListener` during `build()` to resolve port 0 to a real port before discovering interfaces. This ensures all advertised endpoints have the correct port. The pre-bound listener is passed through to the transport's `start()` method.

If `from_listener()` is used (common in tests), the provided listener's address is used directly.

## Loopback Fallback

If no non-loopback interfaces are discovered (e.g. in a container), `resolve_advertise_endpoints()` falls back to advertising loopback (`127.0.0.1` or `::1`) with a warning. Same-machine peers should prefer UDS transport anyway.

## Example: Multi-NIC Server

A machine with `eth0` (192.168.1.10/24, NUMA 0) and `mlx5_0` (10.0.0.1/16, NUMA 1):

```
Builder binds 0.0.0.0:0 → OS assigns port 45000
discover_interfaces() returns:
- eth0: 192.168.1.10, prefix=24, numa=0
- mlx5_0: 10.0.0.1, prefix=16, numa=1

WorkerAddress["tcp"] = msgpack([
{name: "eth0", ip: "192.168.1.10", port: 45000, prefix_len: 24, numa_node: 0},
{name: "mlx5_0", ip: "10.0.0.1", port: 45000, prefix_len: 16, numa_node: 1},
])
```

A peer with GPU on NUMA 1 and local interface `10.0.0.50/16` would select `10.0.0.1:45000` (phase 1: NUMA + subnet match).
20 changes: 20 additions & 0 deletions lib/velo-transports/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! gRPC Transport Module
//!
//! This module provides a gRPC transport implementation using tonic for
//! bidirectional streaming. It wraps the same framing protocol used by
//! TCP/UDS transports in gRPC FramedData messages.

mod server;
mod transport;

/// Generated protobuf types for the VeloStreaming gRPC service.
#[allow(missing_docs)]
pub mod proto {
tonic::include_proto!("velo.streaming.v1");
}

pub use server::VeloStreamingService;
pub use transport::{GrpcTransport, GrpcTransportBuilder};
121 changes: 121 additions & 0 deletions lib/velo-transports/src/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! gRPC server implementation for the VeloStreaming service.
//!
//! This module implements the tonic server-side handler for bidirectional
//! streaming. Inbound `FramedData` messages are parsed using `TcpFrameCodec`
//! and routed to the appropriate adapter channel based on message type.

use bytes::Bytes;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, warn};

use crate::MessageType;
use crate::tcp::TcpFrameCodec;
use crate::transport::{ShutdownState, TransportAdapter};

use super::proto;
use super::proto::velo_streaming_server::VeloStreaming;

/// Tonic service implementation for bidirectional gRPC streaming.
///
/// Each inbound `Stream` RPC opens a bidirectional channel. Inbound frames
/// are decoded, routed through the [`TransportAdapter`], and drain-aware
/// rejection is handled by sending `ShuttingDown` frames back on the
/// response stream.
pub struct VeloStreamingService {
adapter: TransportAdapter,
shutdown_state: ShutdownState,
}

impl VeloStreamingService {
/// Create a new service instance with the given adapter and shutdown state.
pub fn new(adapter: TransportAdapter, shutdown_state: ShutdownState) -> Self {
Self {
adapter,
shutdown_state,
}
}
}

#[tonic::async_trait]
impl VeloStreaming for VeloStreamingService {
type StreamStream = ReceiverStream<Result<proto::FramedData, Status>>;

async fn stream(
&self,
request: Request<Streaming<proto::FramedData>>,
) -> Result<Response<Self::StreamStream>, Status> {
let mut inbound = request.into_inner();
let adapter = self.adapter.clone();
let shutdown_state = self.shutdown_state.clone();

// Response channel for sending frames back to the client (e.g. ShuttingDown).
let (response_tx, response_rx) = mpsc::channel::<Result<proto::FramedData, Status>>(256);

tokio::spawn(async move {
while let Ok(Some(framed_data)) = inbound.message().await {
let msg_type =
match TcpFrameCodec::parse_message_type_from_preamble(&framed_data.preamble) {
Ok(mt) => mt,
Err(e) => {
warn!("gRPC server: invalid preamble: {}", e);
continue;
}
};

// During drain: reject new Message frames with ShuttingDown,
// but always pass through Response/Ack/Event frames.
if shutdown_state.is_draining() && msg_type == MessageType::Message {
debug!("gRPC server: rejecting Message during drain (sending ShuttingDown)");
let preamble = match TcpFrameCodec::build_preamble(
MessageType::ShuttingDown,
framed_data.header.len() as u32,
0,
) {
Ok(p) => p,
Err(e) => {
warn!("gRPC server: failed to build ShuttingDown preamble: {}", e);
continue;
}
};
let reject = proto::FramedData {
preamble: preamble.to_vec(),
header: framed_data.header,
payload: Vec::new(),
};
if response_tx.send(Ok(reject)).await.is_err() {
break;
}
continue;
}

// Route to the appropriate adapter channel.
let sender = match msg_type {
MessageType::Message => &adapter.message_stream,
MessageType::Response => &adapter.response_stream,
MessageType::Ack | MessageType::Event => &adapter.event_stream,
MessageType::ShuttingDown => &adapter.response_stream,
};

if let Err(e) = sender
.send_async((
Bytes::from(framed_data.header),
Bytes::from(framed_data.payload),
))
.await
{
warn!("gRPC server: failed to route {:?} frame: {}", msg_type, e);
break;
}
}

debug!("gRPC server: inbound stream ended");
});

Ok(Response::new(ReceiverStream::new(response_rx)))
}
}
Loading
Loading