Skip to content

Commit 7b00cc7

Browse files
committed
rch::io: use proper codec
1 parent db7643f commit 7b00cc7

File tree

3 files changed

+72
-34
lines changed

3 files changed

+72
-34
lines changed

remoc/src/rch/io/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
use serde::{Deserialize, Serialize};
126126

127127
use super::{bin, oneshot};
128+
use crate::codec;
128129

129130
mod receiver;
130131
mod sender;
@@ -136,11 +137,13 @@ use sender::SizeMode;
136137

137138
/// Internal enum to track size information on the receiver side.
138139
#[derive(Debug, Serialize, Deserialize)]
139-
pub(super) enum SizeInfo {
140+
#[serde(bound(serialize = "Codec: codec::Codec"))]
141+
#[serde(bound(deserialize = "Codec: codec::Codec"))]
142+
pub(super) enum SizeInfo<Codec> {
140143
/// Size was known at channel creation.
141144
Determined(u64),
142145
/// Size will be received when sender shuts down.
143-
Undetermined(oneshot::Receiver<u64, crate::codec::Default>),
146+
Undetermined(oneshot::Receiver<u64, Codec>),
144147
}
145148

146149
/// Creates a new I/O channel with unknown size.
@@ -150,7 +153,10 @@ pub(super) enum SizeInfo {
150153
/// The receiver cannot know the size in advance (returns `None` from [`Receiver::size`]).
151154
///
152155
/// Both ends can be sent to remote endpoints.
153-
pub fn channel() -> (Sender, Receiver) {
156+
pub fn channel<Codec>() -> (Sender<Codec>, Receiver<Codec>)
157+
where
158+
Codec: codec::Codec,
159+
{
154160
let (bin_tx, bin_rx) = bin::channel();
155161
let (size_tx, size_rx) = oneshot::channel();
156162

@@ -171,7 +177,10 @@ pub fn channel() -> (Sender, Receiver) {
171177
/// The receiver can query the size via [`Receiver::size`], which returns `Some(size)`.
172178
///
173179
/// Both ends can be sent to remote endpoints.
174-
pub fn sized(size: u64) -> (Sender, Receiver) {
180+
pub fn sized<Codec>(size: u64) -> (Sender<Codec>, Receiver<Codec>)
181+
where
182+
Codec: codec::Codec,
183+
{
175184
let (bin_tx, bin_rx) = bin::channel();
176185

177186
let sender = Sender::new(bin_tx, SizeMode::Known(size));

remoc/src/rch/io/receiver.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@ use tokio::io::AsyncRead;
1111
use tokio_util::sync::ReusableBoxFuture;
1212

1313
use super::{SizeInfo, bin, oneshot};
14-
use crate::chmux::DataBuf;
14+
use crate::{chmux::DataBuf, codec};
1515

1616
/// An I/O channel receiver that implements [`AsyncRead`].
1717
///
1818
/// Reads binary data from the underlying channel.
1919
/// Tracks bytes read and verifies size on completion.
20-
pub struct Receiver {
20+
pub struct Receiver<Codec = codec::Default> {
2121
/// The underlying binary channel receiver. Uses Mutex for serialization with &self.
2222
bin_receiver: Mutex<Option<bin::Receiver>>,
2323
/// Size information (known or to be received). Uses Option to allow taking during EOF handling.
24-
size_info: Mutex<Option<SizeInfo>>,
24+
size_info: Mutex<Option<SizeInfo<Codec>>>,
2525
/// Total bytes read so far.
2626
bytes_read: u64,
2727
/// Current data buffer being consumed.
@@ -38,7 +38,7 @@ enum ReceiverState {
3838
VerifyingSize(ReusableBoxFuture<'static, Result<u64, io::Error>>),
3939
}
4040

41-
impl fmt::Debug for Receiver {
41+
impl<Codec> fmt::Debug for Receiver<Codec> {
4242
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4343
f.debug_struct("Receiver")
4444
.field("size", &self.size())
@@ -50,16 +50,18 @@ impl fmt::Debug for Receiver {
5050

5151
/// A receiver in transport.
5252
#[derive(Debug, Serialize, Deserialize)]
53-
pub(crate) struct TransportedReceiver {
53+
#[serde(bound(serialize = "Codec: codec::Codec"))]
54+
#[serde(bound(deserialize = "Codec: codec::Codec"))]
55+
pub(crate) struct TransportedReceiver<Codec> {
5456
/// The underlying binary receiver.
5557
bin_receiver: bin::Receiver,
5658
/// Size info for transport.
57-
size: SizeInfo,
59+
size: SizeInfo<Codec>,
5860
}
5961

60-
impl Receiver {
62+
impl<Codec> Receiver<Codec> {
6163
/// Creates a new receiver.
62-
pub(super) fn new(bin_receiver: bin::Receiver, size_info: SizeInfo) -> Self {
64+
pub(super) fn new(bin_receiver: bin::Receiver, size_info: SizeInfo<Codec>) -> Self {
6365
Self {
6466
bin_receiver: Mutex::new(Some(bin_receiver)),
6567
size_info: Mutex::new(Some(size_info)),
@@ -104,11 +106,14 @@ async fn receive_data(mut bin_receiver: bin::Receiver) -> Result<(Option<DataBuf
104106
Ok((data, bin_receiver))
105107
}
106108

107-
async fn receive_size(size_rx: oneshot::Receiver<u64, crate::codec::Default>) -> Result<u64, io::Error> {
109+
async fn receive_size<Codec: codec::Codec>(size_rx: oneshot::Receiver<u64, Codec>) -> Result<u64, io::Error> {
108110
size_rx.await.map_err(|e| io::Error::new(ErrorKind::UnexpectedEof, e.to_string()))
109111
}
110112

111-
impl Receiver {
113+
impl<Codec> Receiver<Codec>
114+
where
115+
Codec: codec::Codec,
116+
{
112117
/// Polls to complete any pending receive or verify operation.
113118
fn poll_complete(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
114119
if let ReceiverState::Receiving(ref mut fut) = self.state {
@@ -175,7 +180,10 @@ impl Receiver {
175180
}
176181
}
177182

178-
impl AsyncRead for Receiver {
183+
impl<Codec> AsyncRead for Receiver<Codec>
184+
where
185+
Codec: codec::Codec,
186+
{
179187
fn poll_read(
180188
mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>,
181189
) -> Poll<io::Result<()>> {
@@ -232,7 +240,10 @@ impl AsyncRead for Receiver {
232240
}
233241
}
234242

235-
impl Serialize for Receiver {
243+
impl<Codec> Serialize for Receiver<Codec>
244+
where
245+
Codec: codec::Codec,
246+
{
236247
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
237248
where
238249
S: serde::Serializer,
@@ -249,16 +260,19 @@ impl Serialize for Receiver {
249260
.take()
250261
.ok_or_else(|| serde::ser::Error::custom("cannot serialize: size info already consumed"))?;
251262

252-
TransportedReceiver { bin_receiver, size }.serialize(serializer)
263+
TransportedReceiver::<Codec> { bin_receiver, size }.serialize(serializer)
253264
}
254265
}
255266

256-
impl<'de> Deserialize<'de> for Receiver {
267+
impl<'de, Codec> Deserialize<'de> for Receiver<Codec>
268+
where
269+
Codec: codec::Codec,
270+
{
257271
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
258272
where
259273
D: serde::Deserializer<'de>,
260274
{
261-
let transported = TransportedReceiver::deserialize(deserializer)?;
275+
let transported = TransportedReceiver::<Codec>::deserialize(deserializer)?;
262276
Ok(Self::new(transported.bin_receiver, transported.size))
263277
}
264278
}

remoc/src/rch/io/sender.rs

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,29 @@ use tokio::io::AsyncWrite;
1212
use tokio_util::sync::ReusableBoxFuture;
1313

1414
use super::{bin, oneshot};
15+
use crate::codec;
1516

1617
/// Size handling mode for the sender.
1718
#[derive(Debug, Serialize, Deserialize)]
18-
pub(super) enum SizeMode {
19+
#[serde(bound(serialize = "Codec: codec::Codec"))]
20+
#[serde(bound(deserialize = "Codec: codec::Codec"))]
21+
pub(super) enum SizeMode<Codec> {
1922
/// Size is known (either upfront or after shutdown).
2023
Known(u64),
2124
/// Size is unknown. Will send final byte count via oneshot on shutdown,
2225
/// then transition to Known.
23-
Unknown(oneshot::Sender<u64, crate::codec::Default>),
26+
Unknown(oneshot::Sender<u64, Codec>),
2427
}
2528

2629
/// An I/O channel sender that implements [`AsyncWrite`].
2730
///
2831
/// Writes binary data to the underlying channel.
2932
/// Tracks bytes written and enforces size limits if specified.
30-
pub struct Sender {
33+
pub struct Sender<Codec = codec::Default> {
3134
/// The underlying binary channel sender. Uses Mutex for serialization with &self.
3235
bin_sender: Mutex<Option<bin::Sender>>,
3336
/// Size handling mode.
34-
size_mode: Mutex<SizeMode>,
37+
size_mode: Mutex<SizeMode<Codec>>,
3538
/// Total bytes written so far.
3639
bytes_written: u64,
3740
/// Cached chunk size from chmux sender.
@@ -42,7 +45,7 @@ pub struct Sender {
4245
sending: Option<ReusableBoxFuture<'static, Result<(bin::Sender, u64), io::Error>>>,
4346
}
4447

45-
impl fmt::Debug for Sender {
48+
impl<Codec> fmt::Debug for Sender<Codec> {
4649
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4750
f.debug_struct("Sender")
4851
.field("expected_size", &self.expected_size())
@@ -53,18 +56,20 @@ impl fmt::Debug for Sender {
5356

5457
/// A sender in transport.
5558
#[derive(Debug, Serialize, Deserialize)]
56-
pub(crate) struct TransportedSender {
59+
#[serde(bound(serialize = "Codec: codec::Codec"))]
60+
#[serde(bound(deserialize = "Codec: codec::Codec"))]
61+
pub(crate) struct TransportedSender<Codec> {
5762
/// The underlying binary sender. None if already closed.
5863
bin_sender: Option<bin::Sender>,
5964
/// Size handling mode.
60-
size_mode: SizeMode,
65+
size_mode: SizeMode<Codec>,
6166
/// Total bytes written so far.
6267
bytes_written: u64,
6368
}
6469

65-
impl Sender {
70+
impl<Codec> Sender<Codec> {
6671
/// Creates a new sender.
67-
pub(super) fn new(bin_sender: bin::Sender, size_mode: SizeMode) -> Self {
72+
pub(super) fn new(bin_sender: bin::Sender, size_mode: SizeMode<Codec>) -> Self {
6873
Self {
6974
bin_sender: Mutex::new(Some(bin_sender)),
7075
size_mode: Mutex::new(size_mode),
@@ -109,7 +114,7 @@ async fn connect_sender(mut bin_sender: bin::Sender) -> Result<(bin::Sender, usi
109114
Ok((bin_sender, chunk_size))
110115
}
111116

112-
impl Sender {
117+
impl<Codec> Sender<Codec> {
113118
/// Polls to complete any pending connect or send operations.
114119
fn poll_complete(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
115120
if let Some(future) = &mut self.connecting {
@@ -154,7 +159,10 @@ impl Sender {
154159
}
155160
}
156161

157-
impl AsyncWrite for Sender {
162+
impl<Codec> AsyncWrite for Sender<Codec>
163+
where
164+
Codec: codec::Codec,
165+
{
158166
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
159167
let this = self.as_mut().get_mut();
160168

@@ -234,7 +242,10 @@ impl AsyncWrite for Sender {
234242
}
235243
}
236244

237-
impl Serialize for Sender {
245+
impl<Codec> Serialize for Sender<Codec>
246+
where
247+
Codec: codec::Codec,
248+
{
238249
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
239250
where
240251
S: serde::Serializer,
@@ -245,16 +256,20 @@ impl Serialize for Sender {
245256
SizeMode::Known(0), // Placeholder, sender is consumed anyway
246257
);
247258

248-
TransportedSender { bin_sender, size_mode, bytes_written: self.bytes_written }.serialize(serializer)
259+
TransportedSender::<Codec> { bin_sender, size_mode, bytes_written: self.bytes_written }
260+
.serialize(serializer)
249261
}
250262
}
251263

252-
impl<'de> Deserialize<'de> for Sender {
264+
impl<'de, Codec> Deserialize<'de> for Sender<Codec>
265+
where
266+
Codec: codec::Codec,
267+
{
253268
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
254269
where
255270
D: serde::Deserializer<'de>,
256271
{
257-
let transported = TransportedSender::deserialize(deserializer)?;
272+
let transported = TransportedSender::<Codec>::deserialize(deserializer)?;
258273

259274
Ok(Self {
260275
bin_sender: Mutex::new(transported.bin_sender),

0 commit comments

Comments
 (0)