Skip to content

Commit cb52c5e

Browse files
Markus Westerlindtaiki-e
authored andcommitted
feat: Add sink::unfold
A more general function than `from_fn` (#2254), as suggested in #2254 (comment)
1 parent 55a31e5 commit cb52c5e

File tree

4 files changed

+174
-37
lines changed

4 files changed

+174
-37
lines changed

futures-util/src/sink/mod.rs

Lines changed: 52 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
//! This module is only available when the `sink` feature of this
77
//! library is activated, and it is activated by default.
88
9+
use crate::future::Either;
910
use core::pin::Pin;
1011
use futures_core::future::Future;
1112
use futures_core::stream::{Stream, TryStream};
1213
use futures_core::task::{Context, Poll};
13-
use crate::future::Either;
1414

1515
#[cfg(feature = "compat")]
1616
use crate::compat::CompatSink;
@@ -41,6 +41,9 @@ pub use self::send::Send;
4141
mod send_all;
4242
pub use self::send_all::SendAll;
4343

44+
mod unfold;
45+
pub use self::unfold::{unfold, Unfold};
46+
4447
mod with;
4548
pub use self::with::With;
4649

@@ -69,10 +72,11 @@ pub trait SinkExt<Item>: Sink<Item> {
6972
/// Note that this function consumes the given sink, returning a wrapped
7073
/// version, much like `Iterator::map`.
7174
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
72-
where F: FnMut(U) -> Fut,
73-
Fut: Future<Output = Result<Item, E>>,
74-
E: From<Self::Error>,
75-
Self: Sized
75+
where
76+
F: FnMut(U) -> Fut,
77+
Fut: Future<Output = Result<Item, E>>,
78+
E: From<Self::Error>,
79+
Self: Sized,
7680
{
7781
With::new(self, f)
7882
}
@@ -110,9 +114,10 @@ pub trait SinkExt<Item>: Sink<Item> {
110114
/// # });
111115
/// ```
112116
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
113-
where F: FnMut(U) -> St,
114-
St: Stream<Item = Result<Item, Self::Error>>,
115-
Self: Sized
117+
where
118+
F: FnMut(U) -> St,
119+
St: Stream<Item = Result<Item, Self::Error>>,
120+
Self: Sized,
116121
{
117122
WithFlatMap::new(self, f)
118123
}
@@ -133,8 +138,9 @@ pub trait SinkExt<Item>: Sink<Item> {
133138

134139
/// Transforms the error returned by the sink.
135140
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
136-
where F: FnOnce(Self::Error) -> E,
137-
Self: Sized,
141+
where
142+
F: FnOnce(Self::Error) -> E,
143+
Self: Sized,
138144
{
139145
SinkMapErr::new(self, f)
140146
}
@@ -143,13 +149,13 @@ pub trait SinkExt<Item>: Sink<Item> {
143149
///
144150
/// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
145151
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
146-
where Self: Sized,
147-
Self::Error: Into<E>,
152+
where
153+
Self: Sized,
154+
Self::Error: Into<E>,
148155
{
149156
SinkErrInto::new(self)
150157
}
151158

152-
153159
/// Adds a fixed-size buffer to the current sink.
154160
///
155161
/// The resulting sink will buffer up to `capacity` items when the
@@ -164,14 +170,16 @@ pub trait SinkExt<Item>: Sink<Item> {
164170
/// library is activated, and it is activated by default.
165171
#[cfg(feature = "alloc")]
166172
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
167-
where Self: Sized,
173+
where
174+
Self: Sized,
168175
{
169176
Buffer::new(self, capacity)
170177
}
171178

172179
/// Close the sink.
173180
fn close(&mut self) -> Close<'_, Self, Item>
174-
where Self: Unpin,
181+
where
182+
Self: Unpin,
175183
{
176184
Close::new(self)
177185
}
@@ -181,9 +189,10 @@ pub trait SinkExt<Item>: Sink<Item> {
181189
/// This adapter clones each incoming item and forwards it to both this as well as
182190
/// the other sink at the same time.
183191
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
184-
where Self: Sized,
185-
Item: Clone,
186-
Si: Sink<Item, Error=Self::Error>
192+
where
193+
Self: Sized,
194+
Item: Clone,
195+
Si: Sink<Item, Error = Self::Error>,
187196
{
188197
Fanout::new(self, other)
189198
}
@@ -193,7 +202,8 @@ pub trait SinkExt<Item>: Sink<Item> {
193202
/// This adapter is intended to be used when you want to stop sending to the sink
194203
/// until all current requests are processed.
195204
fn flush(&mut self) -> Flush<'_, Self, Item>
196-
where Self: Unpin,
205+
where
206+
Self: Unpin,
197207
{
198208
Flush::new(self)
199209
}
@@ -205,7 +215,8 @@ pub trait SinkExt<Item>: Sink<Item> {
205215
/// to batch together items to send via `send_all`, rather than flushing
206216
/// between each item.**
207217
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
208-
where Self: Unpin,
218+
where
219+
Self: Unpin,
209220
{
210221
Send::new(self, item)
211222
}
@@ -221,12 +232,10 @@ pub trait SinkExt<Item>: Sink<Item> {
221232
/// Doing `sink.send_all(stream)` is roughly equivalent to
222233
/// `stream.forward(sink)`. The returned future will exhaust all items from
223234
/// `stream` and send them to `self`.
224-
fn send_all<'a, St>(
225-
&'a mut self,
226-
stream: &'a mut St
227-
) -> SendAll<'a, Self, St>
228-
where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
229-
Self: Unpin,
235+
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
236+
where
237+
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
238+
Self: Unpin,
230239
{
231240
SendAll::new(self, stream)
232241
}
@@ -237,8 +246,9 @@ pub trait SinkExt<Item>: Sink<Item> {
237246
/// This can be used in combination with the `right_sink` method to write `if`
238247
/// statements that evaluate to different streams in different branches.
239248
fn left_sink<Si2>(self) -> Either<Self, Si2>
240-
where Si2: Sink<Item, Error = Self::Error>,
241-
Self: Sized
249+
where
250+
Si2: Sink<Item, Error = Self::Error>,
251+
Self: Sized,
242252
{
243253
Either::Left(self)
244254
}
@@ -249,8 +259,9 @@ pub trait SinkExt<Item>: Sink<Item> {
249259
/// This can be used in combination with the `left_sink` method to write `if`
250260
/// statements that evaluate to different streams in different branches.
251261
fn right_sink<Si1>(self) -> Either<Si1, Self>
252-
where Si1: Sink<Item, Error = Self::Error>,
253-
Self: Sized
262+
where
263+
Si1: Sink<Item, Error = Self::Error>,
264+
Self: Sized,
254265
{
255266
Either::Right(self)
256267
}
@@ -260,39 +271,44 @@ pub trait SinkExt<Item>: Sink<Item> {
260271
#[cfg(feature = "compat")]
261272
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
262273
fn compat(self) -> CompatSink<Self, Item>
263-
where Self: Sized + Unpin,
274+
where
275+
Self: Sized + Unpin,
264276
{
265277
CompatSink::new(self)
266278
}
267-
279+
268280
/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
269281
/// sink types.
270282
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
271-
where Self: Unpin
283+
where
284+
Self: Unpin,
272285
{
273286
Pin::new(self).poll_ready(cx)
274287
}
275288

276289
/// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
277290
/// sink types.
278291
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
279-
where Self: Unpin
292+
where
293+
Self: Unpin,
280294
{
281295
Pin::new(self).start_send(item)
282296
}
283297

284298
/// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
285299
/// sink types.
286300
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
287-
where Self: Unpin
301+
where
302+
Self: Unpin,
288303
{
289304
Pin::new(self).poll_flush(cx)
290305
}
291306

292307
/// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
293308
/// sink types.
294309
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
295-
where Self: Unpin
310+
where
311+
Self: Unpin,
296312
{
297313
Pin::new(self).poll_close(cx)
298314
}

futures-util/src/sink/unfold.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use core::{future::Future, pin::Pin};
2+
use futures_core::task::{Context, Poll};
3+
use futures_sink::Sink;
4+
use pin_project::pin_project;
5+
6+
/// Sink for the [`unfold`] function.
7+
#[pin_project]
8+
#[derive(Debug)]
9+
#[must_use = "sinks do nothing unless polled"]
10+
pub struct Unfold<T, F, R> {
11+
state: Option<T>,
12+
function: F,
13+
#[pin]
14+
future: Option<R>,
15+
}
16+
17+
/// Create a sink from a function which processes one item at a time.
18+
///
19+
/// # Examples
20+
///
21+
/// ```
22+
/// # futures::executor::block_on(async {
23+
/// use futures::sink::{self, SinkExt};
24+
///
25+
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
26+
/// async move {
27+
/// sum += i;
28+
/// eprintln!("{}", i);
29+
/// Ok::<_, futures::never::Never>(sum)
30+
/// }
31+
/// });
32+
/// futures::pin_mut!(unfold);
33+
/// unfold.send(5).await?;
34+
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
35+
/// ```
36+
pub fn unfold<T, F, R>(init: T, function: F) -> Unfold<T, F, R> {
37+
Unfold {
38+
state: Some(init),
39+
function,
40+
future: None,
41+
}
42+
}
43+
44+
impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
45+
where
46+
F: FnMut(T, Item) -> R,
47+
R: Future<Output = Result<T, E>>,
48+
{
49+
type Error = E;
50+
51+
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52+
self.poll_flush(cx)
53+
}
54+
55+
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
56+
let mut this = self.project();
57+
debug_assert!(this.future.is_none());
58+
let future = (this.function)(this.state.take().unwrap(), item);
59+
this.future.set(Some(future));
60+
Ok(())
61+
}
62+
63+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64+
let mut this = self.project();
65+
Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() {
66+
let result = match ready!(future.poll(cx)) {
67+
Ok(state) => {
68+
*this.state = Some(state);
69+
Ok(())
70+
}
71+
Err(err) => Err(err),
72+
};
73+
this.future.set(None);
74+
result
75+
} else {
76+
Ok(())
77+
})
78+
}
79+
80+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81+
self.poll_flush(cx)
82+
}
83+
}

futures/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ pub mod sink {
420420

421421
pub use futures_util::sink::{
422422
Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
423-
SinkExt, Fanout, Drain, drain,
423+
SinkExt, Fanout, Drain, drain, Unfold, unfold,
424424
WithFlatMap,
425425
};
426426

futures/tests/sink.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,44 @@ fn sink_map_err() {
610610
);
611611
}
612612

613+
#[test]
614+
fn sink_unfold() {
615+
use futures::channel::mpsc;
616+
use futures::executor::block_on;
617+
use futures::future::poll_fn;
618+
use futures::sink::{self, Sink, SinkExt};
619+
use futures::task::Poll;
620+
621+
block_on(poll_fn(|cx| {
622+
let (tx, mut rx) = mpsc::channel(1);
623+
let unfold = sink::unfold((), |(), i: i32| {
624+
let mut tx = tx.clone();
625+
async move {
626+
tx.send(i).await.unwrap();
627+
Ok::<_, String>(())
628+
}
629+
});
630+
futures::pin_mut!(unfold);
631+
assert_eq!(unfold.as_mut().start_send(1), Ok(()));
632+
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
633+
assert_eq!(rx.try_next().unwrap(), Some(1));
634+
635+
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
636+
assert_eq!(unfold.as_mut().start_send(2), Ok(()));
637+
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
638+
assert_eq!(unfold.as_mut().start_send(3), Ok(()));
639+
assert_eq!(rx.try_next().unwrap(), Some(2));
640+
assert!(rx.try_next().is_err());
641+
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
642+
assert_eq!(unfold.as_mut().start_send(4), Ok(()));
643+
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
644+
assert_eq!(rx.try_next().unwrap(), Some(3));
645+
assert_eq!(rx.try_next().unwrap(), Some(4));
646+
647+
Poll::Ready(())
648+
}))
649+
}
650+
613651
#[test]
614652
fn err_into() {
615653
use futures::channel::mpsc;

0 commit comments

Comments
 (0)