4
4
use std:: {
5
5
io,
6
6
pin:: Pin ,
7
- task:: { ready , Context , Poll } ,
7
+ task:: { Context , Poll } ,
8
8
} ;
9
9
10
10
use async_compression:: tokio:: write:: ZstdEncoder ;
@@ -13,7 +13,8 @@ use tokio::io::{AsyncWrite, AsyncWriteExt as _};
13
13
/// <https://github.com/Nullus157/async-compression/issues/246>
14
14
#[ tokio:: test]
15
15
async fn issue_246 ( ) {
16
- let mut zstd_encoder = Transparent :: new ( ZstdEncoder :: new ( DelayedShutdown :: default ( ) ) ) ;
16
+ let mut zstd_encoder =
17
+ Transparent :: new ( Trace :: new ( ZstdEncoder :: new ( DelayedShutdown :: default ( ) ) ) ) ;
17
18
zstd_encoder. shutdown ( ) . await . unwrap ( ) ;
18
19
}
19
20
@@ -36,11 +37,17 @@ impl<T: AsyncWrite> AsyncWrite for Transparent<T> {
36
37
cx : & mut Context < ' _ > ,
37
38
buf : & [ u8 ] ,
38
39
) -> Poll < Result < usize , io:: Error > > {
39
- self . project ( ) . inner . poll_write ( cx, buf)
40
+ eprintln ! ( "Transparent::poll_write = ..." ) ;
41
+ let ret = self . project ( ) . inner . poll_write ( cx, buf) ;
42
+ eprintln ! ( "Transparent::poll_write = {:?}" , ret) ;
43
+ ret
40
44
}
41
45
42
46
fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
43
- self . project ( ) . inner . poll_flush ( cx)
47
+ eprintln ! ( "Transparent::poll_flush = ..." ) ;
48
+ let ret = self . project ( ) . inner . poll_flush ( cx) ;
49
+ eprintln ! ( "Transparent::poll_flush = {:?}" , ret) ;
50
+ ret
44
51
}
45
52
46
53
/// To quote the [`AsyncWrite`] docs:
@@ -49,9 +56,15 @@ impl<T: AsyncWrite> AsyncWrite for Transparent<T> {
49
56
/// > That is, callers don't need to call flush before calling shutdown.
50
57
/// > They can rely that by calling shutdown any pending buffered data will be written out.
51
58
fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
59
+ eprintln ! ( "Transparent::poll_shutdown = ..." ) ;
52
60
let mut this = self . project ( ) ;
53
- ready ! ( this. inner. as_mut( ) . poll_flush( cx) ) ?;
54
- this. inner . poll_shutdown ( cx)
61
+ let ret = match this. inner . as_mut ( ) . poll_flush ( cx) {
62
+ Poll :: Ready ( Ok ( ( ) ) ) => this. inner . poll_shutdown ( cx) ,
63
+ Poll :: Ready ( Err ( e) ) => Poll :: Ready ( Err ( e) ) ,
64
+ Poll :: Pending => Poll :: Pending ,
65
+ } ;
66
+ eprintln ! ( "Transparent::poll_shutdown = {:?}" , ret) ;
67
+ ret
55
68
}
56
69
}
57
70
@@ -70,24 +83,73 @@ impl AsyncWrite for DelayedShutdown {
70
83
cx : & mut Context < ' _ > ,
71
84
buf : & [ u8 ] ,
72
85
) -> Poll < Result < usize , io:: Error > > {
86
+ eprintln ! ( "DelayedShutdown::poll_write = ..." ) ;
73
87
let _ = cx;
74
88
self . project ( ) . contents . extend_from_slice ( buf) ;
75
- Poll :: Ready ( Ok ( buf. len ( ) ) )
89
+ let ret = Poll :: Ready ( Ok ( buf. len ( ) ) ) ;
90
+ eprintln ! ( "DelayedShutdown::poll_write = {:?}" , ret) ;
91
+ ret
76
92
}
77
93
78
94
fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
95
+ eprintln ! ( "DelayedShutdown::poll_flush = ..." ) ;
79
96
let _ = cx;
80
- Poll :: Ready ( Ok ( ( ) ) )
97
+ let ret = Poll :: Ready ( Ok ( ( ) ) ) ;
98
+ eprintln ! ( "DelayedShutdown::poll_flush = {:?}" , ret) ;
99
+ ret
81
100
}
82
101
83
102
fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
84
- match self . project ( ) . num_times_shutdown_called {
103
+ eprintln ! ( "DelayedShutdown::poll_shutdown = ..." ) ;
104
+ let ret = match self . project ( ) . num_times_shutdown_called {
85
105
it @ 0 => {
86
106
* it += 1 ;
87
107
cx. waker ( ) . wake_by_ref ( ) ;
88
108
Poll :: Pending
89
109
}
90
110
_ => Poll :: Ready ( Ok ( ( ) ) ) ,
91
- }
111
+ } ;
112
+ eprintln ! ( "DelayedShutdown::poll_shutdown = {:?}" , ret) ;
113
+ ret
114
+ }
115
+ }
116
+
117
+ pin_project_lite:: pin_project! {
118
+ /// A wrapper which traces all calls
119
+ struct Trace <T > {
120
+ #[ pin] inner: T
121
+ }
122
+ }
123
+
124
+ impl < T > Trace < T > {
125
+ fn new ( inner : T ) -> Self {
126
+ Self { inner }
127
+ }
128
+ }
129
+
130
+ impl < T : AsyncWrite > AsyncWrite for Trace < T > {
131
+ fn poll_write (
132
+ self : Pin < & mut Self > ,
133
+ cx : & mut Context < ' _ > ,
134
+ buf : & [ u8 ] ,
135
+ ) -> Poll < Result < usize , io:: Error > > {
136
+ eprintln ! ( "Trace::poll_write = ..." ) ;
137
+ let ret = self . project ( ) . inner . poll_write ( cx, buf) ;
138
+ eprintln ! ( "Trace::poll_write = {:?}" , ret) ;
139
+ ret
140
+ }
141
+
142
+ fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
143
+ eprintln ! ( "Trace::poll_flush = ..." ) ;
144
+ let ret = self . project ( ) . inner . poll_flush ( cx) ;
145
+ eprintln ! ( "Trace::poll_flush = {:?}" , ret) ;
146
+ ret
147
+ }
148
+
149
+ fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
150
+ eprintln ! ( "Trace::poll_shutdown = ..." ) ;
151
+ let ret = self . project ( ) . inner . poll_shutdown ( cx) ;
152
+ eprintln ! ( "Trace::poll_shutdown = {:?}" , ret) ;
153
+ ret
92
154
}
93
155
}
0 commit comments