Skip to content

Commit 234a0d2

Browse files
feat(yamux): future-proof public API
With this set of changes, we prepare the public API of `libp2p-yamux` to be as minimal as possible and allow for upgrades of the underlying `yamux` library in patch releases. Related: #3013. Pull-Request: #3908.
1 parent 25958a2 commit 234a0d2

File tree

2 files changed

+81
-150
lines changed

2 files changed

+81
-150
lines changed

muxers/yamux/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66
- Remove deprecated items.
77
See [PR 3897].
88

9+
- Remove `Incoming`, `LocalIncoming` and `LocalConfig` as well as anything from the underlying `yamux` crate from the public API.
10+
See [PR 3908].
11+
912
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
1013
[PR 3897]: https://github.com/libp2p/rust-libp2p/pull/3897
14+
[PR 3908]: https://github.com/libp2p/rust-libp2p/pull/3908
1115

1216
## 0.43.1
1317

muxers/yamux/src/lib.rs

Lines changed: 77 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,11 @@
2222
2323
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
2424

25-
use futures::{
26-
future,
27-
prelude::*,
28-
ready,
29-
stream::{BoxStream, LocalBoxStream},
30-
};
25+
use futures::{future, prelude::*, ready, stream::BoxStream};
3126
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
3227
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
3328
use std::collections::VecDeque;
29+
use std::io::{IoSlice, IoSliceMut};
3430
use std::task::Waker;
3531
use std::{
3632
fmt, io, iter, mem,
@@ -41,23 +37,25 @@ use thiserror::Error;
4137
use yamux::ConnectionError;
4238

4339
/// A Yamux connection.
44-
pub struct Muxer<S> {
40+
pub struct Muxer<C> {
4541
/// The [`futures::stream::Stream`] of incoming substreams.
46-
incoming: S,
42+
incoming: BoxStream<'static, Result<yamux::Stream, yamux::ConnectionError>>,
4743
/// Handle to control the connection.
4844
control: yamux::Control,
4945
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote.
5046
///
51-
/// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the
47+
/// The only way how yamux can make progress is by driving the stream. However, the
5248
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via
5349
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general
5450
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc.
5551
///
5652
/// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called.
5753
/// Once the buffer is full, new inbound streams are dropped.
58-
inbound_stream_buffer: VecDeque<yamux::Stream>,
54+
inbound_stream_buffer: VecDeque<Stream>,
5955
/// Waker to be called when new inbound streams are available.
6056
inbound_stream_waker: Option<Waker>,
57+
58+
_phantom: std::marker::PhantomData<C>,
6159
}
6260

6361
const MAX_BUFFERED_INBOUND_STREAMS: usize = 25;
@@ -68,7 +66,7 @@ impl<S> fmt::Debug for Muxer<S> {
6866
}
6967
}
7068

71-
impl<C> Muxer<Incoming<C>>
69+
impl<C> Muxer<C>
7270
where
7371
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
7472
{
@@ -78,43 +76,20 @@ where
7876
let ctrl = conn.control();
7977

8078
Self {
81-
incoming: Incoming {
82-
stream: yamux::into_stream(conn).err_into().boxed(),
83-
_marker: std::marker::PhantomData,
84-
},
85-
control: ctrl,
86-
inbound_stream_buffer: VecDeque::default(),
87-
inbound_stream_waker: None,
88-
}
89-
}
90-
}
91-
92-
impl<C> Muxer<LocalIncoming<C>>
93-
where
94-
C: AsyncRead + AsyncWrite + Unpin + 'static,
95-
{
96-
/// Create a new Yamux connection (which is ![`Send`]).
97-
fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
98-
let conn = yamux::Connection::new(io, cfg, mode);
99-
let ctrl = conn.control();
100-
101-
Self {
102-
incoming: LocalIncoming {
103-
stream: yamux::into_stream(conn).err_into().boxed_local(),
104-
_marker: std::marker::PhantomData,
105-
},
79+
incoming: yamux::into_stream(conn).err_into().boxed(),
10680
control: ctrl,
10781
inbound_stream_buffer: VecDeque::default(),
10882
inbound_stream_waker: None,
83+
_phantom: Default::default(),
10984
}
11085
}
11186
}
11287

113-
impl<S> StreamMuxer for Muxer<S>
88+
impl<C> StreamMuxer for Muxer<C>
11489
where
115-
S: Stream<Item = Result<yamux::Stream, Error>> + Unpin,
90+
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
11691
{
117-
type Substream = yamux::Stream;
92+
type Substream = Stream;
11893
type Error = Error;
11994

12095
fn poll_inbound(
@@ -136,6 +111,7 @@ where
136111
) -> Poll<Result<Self::Substream, Self::Error>> {
137112
Pin::new(&mut self.control)
138113
.poll_open_stream(cx)
114+
.map_ok(Stream)
139115
.map_err(Error)
140116
}
141117

@@ -148,7 +124,7 @@ where
148124
let inbound_stream = ready!(this.poll_inner(cx))?;
149125

150126
if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS {
151-
log::warn!("dropping {inbound_stream} because buffer is full");
127+
log::warn!("dropping {} because buffer is full", inbound_stream.0);
152128
drop(inbound_stream);
153129
} else {
154130
this.inbound_stream_buffer.push_back(inbound_stream);
@@ -168,7 +144,9 @@ where
168144
return Poll::Ready(Ok(()));
169145
}
170146

171-
while let Poll::Ready(maybe_inbound_stream) = self.incoming.poll_next_unpin(c)? {
147+
while let Poll::Ready(maybe_inbound_stream) =
148+
self.incoming.poll_next_unpin(c).map_err(Error)?
149+
{
172150
match maybe_inbound_stream {
173151
Some(inbound_stream) => mem::drop(inbound_stream),
174152
None => return Poll::Ready(Ok(())),
@@ -179,14 +157,64 @@ where
179157
}
180158
}
181159

182-
impl<S> Muxer<S>
160+
/// A stream produced by the yamux multiplexer.
161+
#[derive(Debug)]
162+
pub struct Stream(yamux::Stream);
163+
164+
impl AsyncRead for Stream {
165+
fn poll_read(
166+
mut self: Pin<&mut Self>,
167+
cx: &mut Context<'_>,
168+
buf: &mut [u8],
169+
) -> Poll<io::Result<usize>> {
170+
Pin::new(&mut self.0).poll_read(cx, buf)
171+
}
172+
173+
fn poll_read_vectored(
174+
mut self: Pin<&mut Self>,
175+
cx: &mut Context<'_>,
176+
bufs: &mut [IoSliceMut<'_>],
177+
) -> Poll<io::Result<usize>> {
178+
Pin::new(&mut self.0).poll_read_vectored(cx, bufs)
179+
}
180+
}
181+
182+
impl AsyncWrite for Stream {
183+
fn poll_write(
184+
mut self: Pin<&mut Self>,
185+
cx: &mut Context<'_>,
186+
buf: &[u8],
187+
) -> Poll<io::Result<usize>> {
188+
Pin::new(&mut self.0).poll_write(cx, buf)
189+
}
190+
191+
fn poll_write_vectored(
192+
mut self: Pin<&mut Self>,
193+
cx: &mut Context<'_>,
194+
bufs: &[IoSlice<'_>],
195+
) -> Poll<io::Result<usize>> {
196+
Pin::new(&mut self.0).poll_write_vectored(cx, bufs)
197+
}
198+
199+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
200+
Pin::new(&mut self.0).poll_flush(cx)
201+
}
202+
203+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
204+
Pin::new(&mut self.0).poll_close(cx)
205+
}
206+
}
207+
208+
impl<C> Muxer<C>
183209
where
184-
S: Stream<Item = Result<yamux::Stream, Error>> + Unpin,
210+
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
185211
{
186-
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<yamux::Stream, Error>> {
212+
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream, Error>> {
187213
self.incoming.poll_next_unpin(cx).map(|maybe_stream| {
188214
let stream = maybe_stream
189-
.transpose()?
215+
.transpose()
216+
.map_err(Error)?
217+
.map(Stream)
190218
.ok_or(Error(ConnectionError::Closed))?;
191219

192220
Ok(stream)
@@ -241,10 +269,6 @@ impl WindowUpdateMode {
241269
}
242270
}
243271

244-
/// The yamux configuration for upgrading I/O resources which are ![`Send`].
245-
#[derive(Clone)]
246-
pub struct LocalConfig(Config);
247-
248272
impl Config {
249273
/// Creates a new `YamuxConfig` in client mode, regardless of whether
250274
/// it will be used for an inbound or outbound upgrade.
@@ -288,12 +312,6 @@ impl Config {
288312
self.inner.set_window_update_mode(mode.0);
289313
self
290314
}
291-
292-
/// Converts the config into a [`LocalConfig`] for use with upgrades
293-
/// of I/O streams that are ![`Send`].
294-
pub fn into_local(self) -> LocalConfig {
295-
LocalConfig(self)
296-
}
297315
}
298316

299317
impl Default for Config {
@@ -315,20 +333,11 @@ impl UpgradeInfo for Config {
315333
}
316334
}
317335

318-
impl UpgradeInfo for LocalConfig {
319-
type Info = &'static str;
320-
type InfoIter = iter::Once<Self::Info>;
321-
322-
fn protocol_info(&self) -> Self::InfoIter {
323-
iter::once("/yamux/1.0.0")
324-
}
325-
}
326-
327336
impl<C> InboundUpgrade<C> for Config
328337
where
329338
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
330339
{
331-
type Output = Muxer<Incoming<C>>;
340+
type Output = Muxer<C>;
332341
type Error = io::Error;
333342
type Future = future::Ready<Result<Self::Output, Self::Error>>;
334343

@@ -338,26 +347,11 @@ where
338347
}
339348
}
340349

341-
impl<C> InboundUpgrade<C> for LocalConfig
342-
where
343-
C: AsyncRead + AsyncWrite + Unpin + 'static,
344-
{
345-
type Output = Muxer<LocalIncoming<C>>;
346-
type Error = io::Error;
347-
type Future = future::Ready<Result<Self::Output, Self::Error>>;
348-
349-
fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
350-
let cfg = self.0;
351-
let mode = cfg.mode.unwrap_or(yamux::Mode::Server);
352-
future::ready(Ok(Muxer::local(io, cfg.inner, mode)))
353-
}
354-
}
355-
356350
impl<C> OutboundUpgrade<C> for Config
357351
where
358352
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
359353
{
360-
type Output = Muxer<Incoming<C>>;
354+
type Output = Muxer<C>;
361355
type Error = io::Error;
362356
type Future = future::Ready<Result<Self::Output, Self::Error>>;
363357

@@ -367,25 +361,10 @@ where
367361
}
368362
}
369363

370-
impl<C> OutboundUpgrade<C> for LocalConfig
371-
where
372-
C: AsyncRead + AsyncWrite + Unpin + 'static,
373-
{
374-
type Output = Muxer<LocalIncoming<C>>;
375-
type Error = io::Error;
376-
type Future = future::Ready<Result<Self::Output, Self::Error>>;
377-
378-
fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
379-
let cfg = self.0;
380-
let mode = cfg.mode.unwrap_or(yamux::Mode::Client);
381-
future::ready(Ok(Muxer::local(io, cfg.inner, mode)))
382-
}
383-
}
384-
385364
/// The Yamux [`StreamMuxer`] error type.
386365
#[derive(Debug, Error)]
387-
#[error("yamux error: {0}")]
388-
pub struct Error(#[from] yamux::ConnectionError);
366+
#[error(transparent)]
367+
pub struct Error(yamux::ConnectionError);
389368

390369
impl From<Error> for io::Error {
391370
fn from(err: Error) -> Self {
@@ -395,55 +374,3 @@ impl From<Error> for io::Error {
395374
}
396375
}
397376
}
398-
399-
/// The [`futures::stream::Stream`] of incoming substreams.
400-
pub struct Incoming<T> {
401-
stream: BoxStream<'static, Result<yamux::Stream, Error>>,
402-
_marker: std::marker::PhantomData<T>,
403-
}
404-
405-
impl<T> fmt::Debug for Incoming<T> {
406-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407-
f.write_str("Incoming")
408-
}
409-
}
410-
411-
/// The [`futures::stream::Stream`] of incoming substreams (`!Send`).
412-
pub struct LocalIncoming<T> {
413-
stream: LocalBoxStream<'static, Result<yamux::Stream, Error>>,
414-
_marker: std::marker::PhantomData<T>,
415-
}
416-
417-
impl<T> fmt::Debug for LocalIncoming<T> {
418-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
419-
f.write_str("LocalIncoming")
420-
}
421-
}
422-
423-
impl<T> Stream for Incoming<T> {
424-
type Item = Result<yamux::Stream, Error>;
425-
426-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427-
self.stream.as_mut().poll_next_unpin(cx)
428-
}
429-
430-
fn size_hint(&self) -> (usize, Option<usize>) {
431-
self.stream.size_hint()
432-
}
433-
}
434-
435-
impl<T> Unpin for Incoming<T> {}
436-
437-
impl<T> Stream for LocalIncoming<T> {
438-
type Item = Result<yamux::Stream, Error>;
439-
440-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
441-
self.stream.as_mut().poll_next_unpin(cx)
442-
}
443-
444-
fn size_hint(&self) -> (usize, Option<usize>) {
445-
self.stream.size_hint()
446-
}
447-
}
448-
449-
impl<T> Unpin for LocalIncoming<T> {}

0 commit comments

Comments
 (0)