Skip to content

Commit f0df0a1

Browse files
committed
stream|util: revert and update util and stream annotations
1 parent 2ea3529 commit f0df0a1

File tree

13 files changed

+200
-227
lines changed

13 files changed

+200
-227
lines changed

tokio-stream/src/empty.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,12 @@ unsafe impl<T> Sync for Empty<T> {}
2626
/// ```
2727
/// use tokio_stream::{self as stream, StreamExt};
2828
///
29-
/// # /*
30-
/// #[tokio::main]
31-
/// # */
3229
/// # #[tokio::main(flavor = "current_thread")]
33-
/// async fn main() {
34-
/// let mut none = stream::empty::<i32>();
30+
/// # async fn main() {
31+
/// let mut none = stream::empty::<i32>();
3532
///
36-
/// assert_eq!(None, none.next().await);
37-
/// }
33+
/// assert_eq!(None, none.next().await);
34+
/// # }
3835
/// ```
3936
pub const fn empty<T>() -> Empty<T> {
4037
Empty(PhantomData)

tokio-stream/src/lib.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,14 @@
3434
//! ```rust
3535
//! use tokio_stream::{self as stream, StreamExt};
3636
//!
37-
//! # /*
38-
//! #[tokio::main]
39-
//! # */
4037
//! # #[tokio::main(flavor = "current_thread")]
41-
//! async fn main() {
42-
//! let mut stream = stream::iter(vec![0, 1, 2]);
38+
//! # async fn main() {
39+
//! let mut stream = stream::iter(vec![0, 1, 2]);
4340
//!
44-
//! while let Some(value) = stream.next().await {
45-
//! println!("Got {}", value);
46-
//! }
41+
//! while let Some(value) = stream.next().await {
42+
//! println!("Got {}", value);
4743
//! }
44+
//! # }
4845
//! ```
4946
//!
5047
//! # Returning a Stream from a function

tokio-stream/src/once.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,16 @@ impl<I> Unpin for Once<I> {}
2222
/// ```
2323
/// use tokio_stream::{self as stream, StreamExt};
2424
///
25-
/// # /*
26-
/// #[tokio::main]
27-
/// # */
2825
/// # #[tokio::main(flavor = "current_thread")]
29-
/// async fn main() {
30-
/// // one is the loneliest number
31-
/// let mut one = stream::once(1);
26+
/// # async fn main() {
27+
/// // one is the loneliest number
28+
/// let mut one = stream::once(1);
3229
///
33-
/// assert_eq!(Some(1), one.next().await);
30+
/// assert_eq!(Some(1), one.next().await);
3431
///
35-
/// // just one, that's all we get
36-
/// assert_eq!(None, one.next().await);
37-
/// }
32+
/// // just one, that's all we get
33+
/// assert_eq!(None, one.next().await);
34+
/// # }
3835
/// ```
3936
pub fn once<T>(value: T) -> Once<T> {
4037
Once {

tokio-stream/src/stream_close.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,20 @@ pin_project! {
1717
/// ```
1818
/// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
1919
///
20-
/// # /*
21-
/// #[tokio::main]
22-
/// # */
2320
/// # #[tokio::main(flavor = "current_thread")]
24-
/// async fn main() {
25-
/// let mut map = StreamMap::new();
26-
/// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
27-
/// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
28-
/// map.insert(0, stream);
29-
/// map.insert(1, stream2);
30-
/// while let Some((key, val)) = map.next().await {
31-
/// match val {
32-
/// Some(val) => println!("got {val:?} from stream {key:?}"),
33-
/// None => println!("stream {key:?} closed"),
34-
/// }
21+
/// # async fn main() {
22+
/// let mut map = StreamMap::new();
23+
/// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
24+
/// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
25+
/// map.insert(0, stream);
26+
/// map.insert(1, stream2);
27+
/// while let Some((key, val)) = map.next().await {
28+
/// match val {
29+
/// Some(val) => println!("got {val:?} from stream {key:?}"),
30+
/// None => println!("stream {key:?} closed"),
3531
/// }
3632
/// }
33+
/// # }
3734
/// ```
3835
#[must_use = "streams do nothing unless polled"]
3936
pub struct StreamNotifyClose<S> {

tokio-stream/src/stream_ext.rs

Lines changed: 74 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -338,62 +338,59 @@ pub trait StreamExt: Stream {
338338
/// use std::time::Duration;
339339
/// use std::pin::Pin;
340340
///
341-
/// # /*
342-
/// #[tokio::main]
343-
/// # */
344341
/// # #[tokio::main(flavor = "current_thread")]
345-
/// async fn main() {
342+
/// # async fn main() {
346343
/// # time::pause();
347-
/// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
348-
/// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
344+
/// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
345+
/// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
349346
///
350-
/// // Convert the channels to a `Stream`.
351-
/// let rx1 = Box::pin(async_stream::stream! {
352-
/// while let Some(item) = rx1.recv().await {
353-
/// yield item;
354-
/// }
355-
/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
347+
/// // Convert the channels to a `Stream`.
348+
/// let rx1 = Box::pin(async_stream::stream! {
349+
/// while let Some(item) = rx1.recv().await {
350+
/// yield item;
351+
/// }
352+
/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
356353
///
357-
/// let rx2 = Box::pin(async_stream::stream! {
358-
/// while let Some(item) = rx2.recv().await {
359-
/// yield item;
360-
/// }
361-
/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
354+
/// let rx2 = Box::pin(async_stream::stream! {
355+
/// while let Some(item) = rx2.recv().await {
356+
/// yield item;
357+
/// }
358+
/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
362359
///
363-
/// let mut rx = rx1.merge(rx2);
360+
/// let mut rx = rx1.merge(rx2);
364361
///
365-
/// tokio::spawn(async move {
366-
/// // Send some values immediately
367-
/// tx1.send(1).await.unwrap();
368-
/// tx1.send(2).await.unwrap();
362+
/// tokio::spawn(async move {
363+
/// // Send some values immediately
364+
/// tx1.send(1).await.unwrap();
365+
/// tx1.send(2).await.unwrap();
369366
///
370-
/// // Let the other task send values
371-
/// time::sleep(Duration::from_millis(20)).await;
367+
/// // Let the other task send values
368+
/// time::sleep(Duration::from_millis(20)).await;
372369
///
373-
/// tx1.send(4).await.unwrap();
374-
/// });
370+
/// tx1.send(4).await.unwrap();
371+
/// });
375372
///
376-
/// tokio::spawn(async move {
377-
/// // Wait for the first task to send values
378-
/// time::sleep(Duration::from_millis(5)).await;
373+
/// tokio::spawn(async move {
374+
/// // Wait for the first task to send values
375+
/// time::sleep(Duration::from_millis(5)).await;
379376
///
380-
/// tx2.send(3).await.unwrap();
377+
/// tx2.send(3).await.unwrap();
381378
///
382-
/// time::sleep(Duration::from_millis(25)).await;
379+
/// time::sleep(Duration::from_millis(25)).await;
383380
///
384-
/// // Send the final value
385-
/// tx2.send(5).await.unwrap();
386-
/// });
381+
/// // Send the final value
382+
/// tx2.send(5).await.unwrap();
383+
/// });
387384
///
388-
/// assert_eq!(1, rx.next().await.unwrap());
389-
/// assert_eq!(2, rx.next().await.unwrap());
390-
/// assert_eq!(3, rx.next().await.unwrap());
391-
/// assert_eq!(4, rx.next().await.unwrap());
392-
/// assert_eq!(5, rx.next().await.unwrap());
385+
/// assert_eq!(1, rx.next().await.unwrap());
386+
/// assert_eq!(2, rx.next().await.unwrap());
387+
/// assert_eq!(3, rx.next().await.unwrap());
388+
/// assert_eq!(4, rx.next().await.unwrap());
389+
/// assert_eq!(5, rx.next().await.unwrap());
393390
///
394-
/// // The merged stream is consumed
395-
/// assert!(rx.next().await.is_none());
396-
/// }
391+
/// // The merged stream is consumed
392+
/// assert!(rx.next().await.is_none());
393+
/// # }
397394
/// ```
398395
fn merge<U>(self, other: U) -> Merge<Self, U>
399396
where
@@ -791,24 +788,21 @@ pub trait StreamExt: Stream {
791788
/// ```
792789
/// use tokio_stream::{self as stream, StreamExt};
793790
///
794-
/// # /*
795-
/// #[tokio::main]
796-
/// # */
797791
/// # #[tokio::main(flavor = "current_thread")]
798-
/// async fn main() {
799-
/// let one = stream::iter(vec![1, 2, 3]);
800-
/// let two = stream::iter(vec![4, 5, 6]);
792+
/// # async fn main() {
793+
/// let one = stream::iter(vec![1, 2, 3]);
794+
/// let two = stream::iter(vec![4, 5, 6]);
801795
///
802-
/// let mut stream = one.chain(two);
796+
/// let mut stream = one.chain(two);
803797
///
804-
/// assert_eq!(stream.next().await, Some(1));
805-
/// assert_eq!(stream.next().await, Some(2));
806-
/// assert_eq!(stream.next().await, Some(3));
807-
/// assert_eq!(stream.next().await, Some(4));
808-
/// assert_eq!(stream.next().await, Some(5));
809-
/// assert_eq!(stream.next().await, Some(6));
810-
/// assert_eq!(stream.next().await, None);
811-
/// }
798+
/// assert_eq!(stream.next().await, Some(1));
799+
/// assert_eq!(stream.next().await, Some(2));
800+
/// assert_eq!(stream.next().await, Some(3));
801+
/// assert_eq!(stream.next().await, Some(4));
802+
/// assert_eq!(stream.next().await, Some(5));
803+
/// assert_eq!(stream.next().await, Some(6));
804+
/// assert_eq!(stream.next().await, None);
805+
/// # }
812806
/// ```
813807
fn chain<U>(self, other: U) -> Chain<Self, U>
814808
where
@@ -881,49 +875,43 @@ pub trait StreamExt: Stream {
881875
/// ```
882876
/// use tokio_stream::{self as stream, StreamExt};
883877
///
884-
/// # /*
885-
/// #[tokio::main]
886-
/// # */
887878
/// # #[tokio::main(flavor = "current_thread")]
888-
/// async fn main() {
889-
/// let doubled: Vec<i32> =
890-
/// stream::iter(vec![1, 2, 3])
891-
/// .map(|x| x * 2)
892-
/// .collect()
893-
/// .await;
879+
/// # async fn main() {
880+
/// let doubled: Vec<i32> =
881+
/// stream::iter(vec![1, 2, 3])
882+
/// .map(|x| x * 2)
883+
/// .collect()
884+
/// .await;
894885
///
895-
/// assert_eq!(vec![2, 4, 6], doubled);
896-
/// }
886+
/// assert_eq!(vec![2, 4, 6], doubled);
887+
/// # }
897888
/// ```
898889
///
899890
/// Collecting a stream of `Result` values
900891
///
901892
/// ```
902893
/// use tokio_stream::{self as stream, StreamExt};
903894
///
904-
/// # /*
905-
/// #[tokio::main]
906-
/// # */
907895
/// # #[tokio::main(flavor = "current_thread")]
908-
/// async fn main() {
909-
/// // A stream containing only `Ok` values will be collected
910-
/// let values: Result<Vec<i32>, &str> =
911-
/// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
912-
/// .collect()
913-
/// .await;
896+
/// # async fn main() {
897+
/// // A stream containing only `Ok` values will be collected
898+
/// let values: Result<Vec<i32>, &str> =
899+
/// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
900+
/// .collect()
901+
/// .await;
914902
///
915-
/// assert_eq!(Ok(vec![1, 2, 3]), values);
903+
/// assert_eq!(Ok(vec![1, 2, 3]), values);
916904
///
917-
/// // A stream containing `Err` values will return the first error.
918-
/// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
905+
/// // A stream containing `Err` values will return the first error.
906+
/// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
919907
///
920-
/// let values: Result<Vec<i32>, &str> =
921-
/// stream::iter(results)
922-
/// .collect()
923-
/// .await;
908+
/// let values: Result<Vec<i32>, &str> =
909+
/// stream::iter(results)
910+
/// .collect()
911+
/// .await;
924912
///
925-
/// assert_eq!(Err("no"), values);
926-
/// }
913+
/// assert_eq!(Err("no"), values);
914+
/// # }
927915
/// ```
928916
fn collect<T>(self) -> Collect<Self, T>
929917
where

0 commit comments

Comments
 (0)