Skip to content

Commit db7643f

Browse files
committed
rch: io channel
A channel for streaming binary data with receive verification.
1 parent 9e856e0 commit db7643f

File tree

6 files changed

+1623
-0
lines changed

6 files changed

+1623
-0
lines changed

remoc/src/rch/io/mod.rs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
//! A channel for streaming binary data with receive verification.
2+
//!
3+
//! This channel provides [`AsyncWrite`](tokio::io::AsyncWrite) and
4+
//! [`AsyncRead`](tokio::io::AsyncRead) implementations for streaming binary data
5+
//! between endpoints. The receiver can verify that all transmitted data has been
6+
//! received completely, ensuring data integrity.
7+
//!
8+
//! # Modes
9+
//!
10+
//! The channel supports two modes depending on whether the total data size is known upfront:
11+
//!
12+
//! - **Known size** ([`sized`]): The sender specifies the exact number of bytes to transmit.
13+
//! The receiver knows the size in advance via [`Receiver::size`] and verifies that the
14+
//! received data matches.
15+
//!
16+
//! - **Unknown size** ([`channel`]): The sender can write any amount of data.
17+
//! The receiver learns the final size only upon completion and verifies integrity.
18+
//!
19+
//! In both cases, the receiver returns an error if the received byte count does not match
20+
//! the expected size.
21+
//!
22+
//! # Completion
23+
//!
24+
//! - **Known size**: Calling [`shutdown`](tokio::io::AsyncWriteExt::shutdown) is optional.
25+
//! The receiver uses the pre-announced size to determine completion.
26+
//! However, [`flush`](tokio::io::AsyncWriteExt::flush) must be called before dropping
27+
//! the sender to ensure all pending data is transmitted, as per the standard
28+
//! [`AsyncWrite`](tokio::io::AsyncWrite) contract.
29+
//!
30+
//! - **Unknown size**: Calling [`shutdown`](tokio::io::AsyncWriteExt::shutdown) is **required**.
31+
//! This sends the final byte count to the receiver. If the sender is dropped without calling
32+
//! shutdown, the receiver will return an [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error.
33+
//!
34+
//! # Remote constraint
35+
//!
36+
//! At least one half of this channel must be sent to a remote endpoint.
37+
//! Using both halves locally will cause operations to block indefinitely.
38+
//!
39+
//! # Example: Known size
40+
//!
41+
//! Use [`sized`] when the data size is known upfront (e.g., file transfer with known file size).
42+
//!
43+
//! ```
44+
//! use remoc::prelude::*;
45+
//! use tokio::io::{AsyncWriteExt, AsyncReadExt};
46+
//!
47+
//! // This would be run on the client.
48+
//! async fn client(mut tx: rch::base::Sender<rch::io::Receiver>) {
49+
//! let (mut io_tx, io_rx) = rch::io::sized(11);
50+
//!
51+
//! // The sender knows the expected size.
52+
//! assert_eq!(io_tx.expected_size(), Some(11));
53+
//!
54+
//! // Send receiver to server.
55+
//! tx.send(io_rx).await.unwrap();
56+
//!
57+
//! // Write data.
58+
//! io_tx.write_all(b"hello world").await.unwrap();
59+
//! io_tx.shutdown().await.unwrap();
60+
//! // For sized channels, this would be sufficient:
61+
//! // io_tx.flush().await.unwrap();
62+
//! }
63+
//!
64+
//! // This would be run on the server.
65+
//! async fn server(mut rx: rch::base::Receiver<rch::io::Receiver>) {
66+
//! let mut io_rx = rx.recv().await.unwrap().unwrap();
67+
//!
68+
//! // The receiver knows the size in advance.
69+
//! assert_eq!(io_rx.size(), Some(11));
70+
//!
71+
//! // Read data.
72+
//! let mut buf = Vec::new();
73+
//! io_rx.read_to_end(&mut buf).await.unwrap();
74+
//! assert_eq!(buf, b"hello world");
75+
//! }
76+
//! # tokio_test::block_on(remoc::doctest::client_server(client, server));
77+
//! ```
78+
//!
79+
//! # Example: Unknown size
80+
//!
81+
//! Use [`channel`] when the data size is not known upfront (e.g., streaming or compressed data).
82+
//! **Calling shutdown is required** to signal completion and send the final size to the receiver.
83+
//!
84+
//! ```
85+
//! use remoc::prelude::*;
86+
//! use tokio::io::{AsyncWriteExt, AsyncReadExt};
87+
//!
88+
//! // This would be run on the client.
89+
//! async fn client(mut tx: rch::base::Sender<rch::io::Receiver>) {
90+
//! let (mut io_tx, io_rx) = rch::io::channel();
91+
//!
92+
//! // Size is unknown.
93+
//! assert_eq!(io_tx.expected_size(), None);
94+
//!
95+
//! // Send receiver to server.
96+
//! tx.send(io_rx).await.unwrap();
97+
//!
98+
//! // Write data in chunks (size determined at runtime).
99+
//! io_tx.write_all(b"streaming ").await.unwrap();
100+
//! io_tx.write_all(b"data").await.unwrap();
101+
//!
102+
//! // REQUIRED: shutdown sends the final size to the receiver.
103+
//! // Without this, the receiver will fail with UnexpectedEof.
104+
//! io_tx.shutdown().await.unwrap();
105+
//! }
106+
//!
107+
//! // This would be run on the server.
108+
//! async fn server(mut rx: rch::base::Receiver<rch::io::Receiver>) {
109+
//! let mut io_rx = rx.recv().await.unwrap().unwrap();
110+
//!
111+
//! // Size is unknown until EOF.
112+
//! assert_eq!(io_rx.size(), None);
113+
//!
114+
//! // Read all data.
115+
//! let mut buf = Vec::new();
116+
//! io_rx.read_to_end(&mut buf).await.unwrap();
117+
//! assert_eq!(buf, b"streaming data");
118+
//!
119+
//! // After EOF, size becomes known.
120+
//! assert_eq!(io_rx.size(), Some(14));
121+
//! }
122+
//! # tokio_test::block_on(remoc::doctest::client_server(client, server));
123+
//! ```
124+
125+
use serde::{Deserialize, Serialize};
126+
127+
use super::{bin, oneshot};
128+
129+
mod receiver;
130+
mod sender;
131+
132+
pub use receiver::Receiver;
133+
pub use sender::Sender;
134+
135+
use sender::SizeMode;
136+
137+
/// Internal enum to track size information on the receiver side.
138+
#[derive(Debug, Serialize, Deserialize)]
139+
pub(super) enum SizeInfo {
140+
/// Size was known at channel creation.
141+
Determined(u64),
142+
/// Size will be received when sender shuts down.
143+
Undetermined(oneshot::Receiver<u64, crate::codec::Default>),
144+
}
145+
146+
/// Creates a new I/O channel with unknown size.
147+
///
148+
/// The sender must call [`AsyncWriteExt::shutdown`](tokio::io::AsyncWriteExt::shutdown)
149+
/// when done writing to signal completion.
150+
/// The receiver cannot know the size in advance (returns `None` from [`Receiver::size`]).
151+
///
152+
/// Both ends can be sent to remote endpoints.
153+
pub fn channel() -> (Sender, Receiver) {
154+
let (bin_tx, bin_rx) = bin::channel();
155+
let (size_tx, size_rx) = oneshot::channel();
156+
157+
let sender = Sender::new(bin_tx, SizeMode::Unknown(size_tx));
158+
let receiver = Receiver::new(bin_rx, SizeInfo::Undetermined(size_rx));
159+
160+
(sender, receiver)
161+
}
162+
163+
/// Creates a new I/O channel with known size.
164+
///
165+
/// The sender is expected to write exactly `size` bytes.
166+
/// Attempting to write more will result in an error.
167+
/// Calling [`flush`](tokio::io::AsyncWriteExt::flush) before dropping ensures all data is sent.
168+
/// If [`shutdown`](tokio::io::AsyncWriteExt::shutdown) is called, it verifies that exactly
169+
/// `size` bytes were written.
170+
///
171+
/// The receiver can query the size via [`Receiver::size`], which returns `Some(size)`.
172+
///
173+
/// Both ends can be sent to remote endpoints.
174+
pub fn sized(size: u64) -> (Sender, Receiver) {
175+
let (bin_tx, bin_rx) = bin::channel();
176+
177+
let sender = Sender::new(bin_tx, SizeMode::Known(size));
178+
let receiver = Receiver::new(bin_rx, SizeInfo::Determined(size));
179+
180+
(sender, receiver)
181+
}

0 commit comments

Comments
 (0)