File tree Expand file tree Collapse file tree 3 files changed +30
-27
lines changed Expand file tree Collapse file tree 3 files changed +30
-27
lines changed Original file line number Diff line number Diff line change @@ -14,6 +14,7 @@ repository = "https://github.com/tokio-rs/async-stream"
14
14
[dependencies ]
15
15
async-stream-impl = { version = " =0.3.3" , path = " ../async-stream-impl" }
16
16
futures-core = " 0.3"
17
+ pin-project-lite = " 0.2"
17
18
18
19
[dev-dependencies ]
19
20
futures-util = " 0.3"
Original file line number Diff line number Diff line change 1
1
use crate :: yielder:: Receiver ;
2
2
3
3
use futures_core:: { FusedStream , Stream } ;
4
+ use pin_project_lite:: pin_project;
4
5
use std:: future:: Future ;
5
6
use std:: pin:: Pin ;
6
7
use std:: task:: { Context , Poll } ;
7
8
8
- #[ doc( hidden) ]
9
- #[ derive( Debug ) ]
10
- pub struct AsyncStream < T , U > {
11
- rx : Receiver < T > ,
12
- done : bool ,
13
- generator : U ,
9
+ pin_project ! {
10
+ #[ doc( hidden) ]
11
+ #[ derive( Debug ) ]
12
+ pub struct AsyncStream <T , U > {
13
+ rx: Receiver <T >,
14
+ done: bool ,
15
+ #[ pin]
16
+ generator: U ,
17
+ }
14
18
}
15
19
16
20
impl < T , U > AsyncStream < T , U > {
@@ -40,30 +44,28 @@ where
40
44
type Item = T ;
41
45
42
46
fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
43
- unsafe {
44
- let me = Pin :: get_unchecked_mut ( self ) ;
47
+ let me = self . project ( ) ;
45
48
46
- if me. done {
47
- return Poll :: Ready ( None ) ;
48
- }
49
+ if * me. done {
50
+ return Poll :: Ready ( None ) ;
51
+ }
49
52
50
- let mut dst = None ;
51
- let res = {
52
- let _enter = me. rx . enter ( & mut dst) ;
53
- Pin :: new_unchecked ( & mut me. generator ) . poll ( cx)
54
- } ;
53
+ let mut dst = None ;
54
+ let res = {
55
+ let _enter = me. rx . enter ( & mut dst) ;
56
+ me. generator . poll ( cx)
57
+ } ;
55
58
56
- me. done = res. is_ready ( ) ;
59
+ * me. done = res. is_ready ( ) ;
57
60
58
- if dst. is_some ( ) {
59
- return Poll :: Ready ( dst. take ( ) ) ;
60
- }
61
+ if dst. is_some ( ) {
62
+ return Poll :: Ready ( dst. take ( ) ) ;
63
+ }
61
64
62
- if me. done {
63
- Poll :: Ready ( None )
64
- } else {
65
- Poll :: Pending
66
- }
65
+ if * me. done {
66
+ Poll :: Ready ( None )
67
+ } else {
68
+ Poll :: Pending
67
69
}
68
70
}
69
71
Original file line number Diff line number Diff line change @@ -53,9 +53,9 @@ impl<T> Future for Send<T> {
53
53
return Poll :: Ready ( ( ) ) ;
54
54
}
55
55
56
- STORE . with ( |cell| unsafe {
56
+ STORE . with ( |cell| {
57
57
let ptr = cell. get ( ) as * mut Option < T > ;
58
- let option_ref = ptr. as_mut ( ) . expect ( "invalid usage" ) ;
58
+ let option_ref = unsafe { ptr. as_mut ( ) } . expect ( "invalid usage" ) ;
59
59
60
60
if option_ref. is_none ( ) {
61
61
* option_ref = self . value . take ( ) ;
You can’t perform that action at this time.
0 commit comments