Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/velo-transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tokio-stream = { version = "0.1", optional = true, features = ["sync"] }
tokio = { workspace = true, features = ["test-util", "macros"] }
tower = "0.5"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
paste = "1"

[build-dependencies]
tonic-build = "0.13.1"
4 changes: 3 additions & 1 deletion lib/velo-transports/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Only compile proto files if grpc feature is enabled
#[cfg(feature = "grpc")]
{
tonic_build::compile_protos("proto/velo.proto")?;
tonic_build::configure()
.bytes(["velo.streaming.v1.FramedData"])
.compile_protos(&["proto/velo.proto"], &["proto"])?;
}
Ok(())
}
20 changes: 20 additions & 0 deletions lib/velo-transports/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! gRPC Transport Module
//!
//! This module provides a gRPC transport implementation using tonic for
//! bidirectional streaming. It wraps the same framing protocol used by
//! TCP/UDS transports in gRPC FramedData messages.

mod server;
mod transport;

/// Generated protobuf types for the VeloStreaming gRPC service.
#[allow(missing_docs)]
pub mod proto {
tonic::include_proto!("velo.streaming.v1");
}

pub use server::VeloStreamingService;
pub use transport::{GrpcTransport, GrpcTransportBuilder};
121 changes: 121 additions & 0 deletions lib/velo-transports/src/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! gRPC server implementation for the VeloStreaming service.
//!
//! This module implements the tonic server-side handler for bidirectional
//! streaming. Inbound `FramedData` messages are parsed using `TcpFrameCodec`
//! and routed to the appropriate adapter channel based on message type.

use bytes::Bytes;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, warn};

use crate::MessageType;
use crate::tcp::TcpFrameCodec;
use crate::transport::{ShutdownState, TransportAdapter};

use super::proto;
use super::proto::velo_streaming_server::VeloStreaming;

/// Tonic service implementation for bidirectional gRPC streaming.
///
/// Each inbound `Stream` RPC opens a bidirectional channel. Inbound frames
/// are decoded, routed through the [`TransportAdapter`], and drain-aware
/// rejection is handled by sending `ShuttingDown` frames back on the
/// response stream.
pub struct VeloStreamingService {
adapter: TransportAdapter,
shutdown_state: ShutdownState,
}

impl VeloStreamingService {
/// Create a new service instance with the given adapter and shutdown state.
pub fn new(adapter: TransportAdapter, shutdown_state: ShutdownState) -> Self {
Self {
adapter,
shutdown_state,
}
}
}

#[tonic::async_trait]
impl VeloStreaming for VeloStreamingService {
type StreamStream = ReceiverStream<Result<proto::FramedData, Status>>;

async fn stream(
&self,
request: Request<Streaming<proto::FramedData>>,
) -> Result<Response<Self::StreamStream>, Status> {
let mut inbound = request.into_inner();
let adapter = self.adapter.clone();
let shutdown_state = self.shutdown_state.clone();

// Response channel for sending frames back to the client (e.g. ShuttingDown).
let (response_tx, response_rx) = mpsc::channel::<Result<proto::FramedData, Status>>(256);

tokio::spawn(async move {
while let Ok(Some(framed_data)) = inbound.message().await {
let msg_type =
match TcpFrameCodec::parse_message_type_from_preamble(&framed_data.preamble) {
Ok(mt) => mt,
Err(e) => {
warn!("gRPC server: invalid preamble: {}", e);
continue;
}
};

// During drain: reject new Message frames with ShuttingDown,
// but always pass through Response/Ack/Event frames.
if shutdown_state.is_draining() && msg_type == MessageType::Message {
debug!("gRPC server: rejecting Message during drain (sending ShuttingDown)");
let preamble = match TcpFrameCodec::build_preamble(
MessageType::ShuttingDown,
framed_data.header.len() as u32,
0,
) {
Ok(p) => p,
Err(e) => {
warn!("gRPC server: failed to build ShuttingDown preamble: {}", e);
continue;
}
};
let reject = proto::FramedData {
preamble: preamble.to_vec(),
header: framed_data.header,
payload: Vec::new(),
};
if response_tx.send(Ok(reject)).await.is_err() {
break;
}
continue;
}

// Route to the appropriate adapter channel.
let sender = match msg_type {
MessageType::Message => &adapter.message_stream,
MessageType::Response => &adapter.response_stream,
MessageType::Ack | MessageType::Event => &adapter.event_stream,
MessageType::ShuttingDown => &adapter.response_stream,
};

if let Err(e) = sender
.send_async((
Bytes::from(framed_data.header),
Bytes::from(framed_data.payload),
))
.await
{
warn!("gRPC server: failed to route {:?} frame: {}", msg_type, e);
break;
}
}

debug!("gRPC server: inbound stream ended");
});

Ok(Response::new(ReceiverStream::new(response_rx)))
}
}
Loading
Loading