@@ -253,8 +253,14 @@ impl Inner {
253253
254254#[ cfg( test) ]
255255mod tests {
256+ use std:: task:: Poll ;
257+ use std:: time:: Duration ;
258+
259+ use actix_rt:: time:: timeout;
256260 use actix_utils:: future:: poll_fn;
261+ use futures_util:: { FutureExt , StreamExt } ;
257262 use static_assertions:: { assert_impl_all, assert_not_impl_any} ;
263+ use tokio:: sync:: oneshot;
258264
259265 use super :: * ;
260266
@@ -263,6 +269,67 @@ mod tests {
263269
264270 assert_impl_all ! ( Inner : Unpin , Send , Sync ) ;
265271
272+ const WAKE_TIMEOUT : Duration = Duration :: from_secs ( 2 ) ;
273+
274+ fn prepare_waking_test (
275+ mut payload : Payload ,
276+ expected : Option < Result < ( ) , ( ) > > ,
277+ ) -> ( oneshot:: Receiver < ( ) > , actix_rt:: task:: JoinHandle < ( ) > ) {
278+ let ( tx, rx) = oneshot:: channel ( ) ;
279+
280+ let handle = actix_rt:: spawn ( async move {
281+ // Make sure to poll once to set the waker
282+ poll_fn ( |cx| {
283+ assert ! ( payload. poll_next_unpin( cx) . is_pending( ) ) ;
284+ Poll :: Ready ( ( ) )
285+ } )
286+ . await ;
287+ tx. send ( ( ) ) . unwrap ( ) ;
288+
289+ // actix-rt is single-threaded, so this won't race with `rx.await`
290+ let mut pend_once = false ;
291+ poll_fn ( |_| {
292+ if pend_once {
293+ Poll :: Ready ( ( ) )
294+ } else {
295+ // Return pending without storing wakers, we already did on the previous
296+ // `poll_fn`, now this task will only continue if the `sender` wakes us
297+ pend_once = true ;
298+ Poll :: Pending
299+ }
300+ } )
301+ . await ;
302+
303+ let got = payload. next ( ) . now_or_never ( ) . unwrap ( ) ;
304+ match expected {
305+ Some ( Ok ( _) ) => assert ! ( got. unwrap( ) . is_ok( ) ) ,
306+ Some ( Err ( _) ) => assert ! ( got. unwrap( ) . is_err( ) ) ,
307+ None => assert ! ( got. is_none( ) ) ,
308+ }
309+ } ) ;
310+ ( rx, handle)
311+ }
312+
313+ #[ actix_rt:: test]
314+ async fn wake_on_error ( ) {
315+ let ( mut sender, payload) = Payload :: create ( false ) ;
316+ let ( rx, handle) = prepare_waking_test ( payload, Some ( Err ( ( ) ) ) ) ;
317+
318+ rx. await . unwrap ( ) ;
319+ sender. set_error ( PayloadError :: Incomplete ( None ) ) ;
320+ timeout ( WAKE_TIMEOUT , handle) . await . unwrap ( ) . unwrap ( ) ;
321+ }
322+
323+ #[ actix_rt:: test]
324+ async fn wake_on_eof ( ) {
325+ let ( mut sender, payload) = Payload :: create ( false ) ;
326+ let ( rx, handle) = prepare_waking_test ( payload, None ) ;
327+
328+ rx. await . unwrap ( ) ;
329+ sender. feed_eof ( ) ;
330+ timeout ( WAKE_TIMEOUT , handle) . await . unwrap ( ) . unwrap ( ) ;
331+ }
332+
266333 #[ actix_rt:: test]
267334 async fn test_unread_data ( ) {
268335 let ( _, mut payload) = Payload :: create ( false ) ;
0 commit comments