@@ -353,6 +353,123 @@ def run_in_thread():
353353        t .join ()
354354        self .assertEqual (results , ['hello' , 'world' ])
355355
356+     def  test_call_soon_threadsafe_handle_block_check_cancelled (self ):
357+         results  =  []
358+ 
359+         callback_started  =  threading .Event ()
360+         callback_finished  =  threading .Event ()
361+         def  callback (arg ):
362+             callback_started .set ()
363+             results .append (arg )
364+             time .sleep (1 )
365+             callback_finished .set ()
366+ 
367+         def  run_in_thread ():
368+             handle  =  self .loop .call_soon_threadsafe (callback , 'hello' )
369+             self .assertIsInstance (handle , events ._ThreadSafeHandle )
370+             callback_started .wait ()
371+             # callback started so it should block checking for cancellation 
372+             # until it finishes 
373+             self .assertFalse (handle .cancelled ())
374+             self .assertTrue (callback_finished .is_set ())
375+             self .loop .call_soon_threadsafe (self .loop .stop )
376+ 
377+         t  =  threading .Thread (target = run_in_thread )
378+         t .start ()
379+ 
380+         self .loop .run_forever ()
381+         t .join ()
382+         self .assertEqual (results , ['hello' ])
383+ 
384+     def  test_call_soon_threadsafe_handle_block_cancellation (self ):
385+         results  =  []
386+ 
387+         callback_started  =  threading .Event ()
388+         callback_finished  =  threading .Event ()
389+         def  callback (arg ):
390+             callback_started .set ()
391+             results .append (arg )
392+             time .sleep (1 )
393+             callback_finished .set ()
394+ 
395+         def  run_in_thread ():
396+             handle  =  self .loop .call_soon_threadsafe (callback , 'hello' )
397+             self .assertIsInstance (handle , events ._ThreadSafeHandle )
398+             callback_started .wait ()
399+             # callback started so it should not be cancel it from other thread until it finishes 
400+             handle .cancel ()
401+             self .assertTrue (callback_finished .is_set ())
402+             self .loop .call_soon_threadsafe (self .loop .stop )
403+ 
404+         t  =  threading .Thread (target = run_in_thread )
405+         t .start ()
406+ 
407+         self .loop .run_forever ()
408+         t .join ()
409+         self .assertEqual (results , ['hello' ])
410+ 
411+     def  test_call_soon_threadsafe_handle_cancel_same_thread (self ):
412+         results  =  []
413+         callback_started  =  threading .Event ()
414+         callback_finished  =  threading .Event ()
415+ 
416+         fut  =  concurrent .futures .Future ()
417+         def  callback (arg ):
418+             callback_started .set ()
419+             handle  =  fut .result ()
420+             handle .cancel ()
421+             results .append (arg )
422+             callback_finished .set ()
423+             self .loop .stop ()
424+ 
425+         def  run_in_thread ():
426+             handle  =  self .loop .call_soon_threadsafe (callback , 'hello' )
427+             fut .set_result (handle )
428+             self .assertIsInstance (handle , events ._ThreadSafeHandle )
429+             callback_started .wait ()
430+             # callback cancels itself from same thread so it has no effect 
431+             # it runs to completion 
432+             self .assertTrue (handle .cancelled ())
433+             self .assertTrue (callback_finished .is_set ())
434+             self .loop .call_soon_threadsafe (self .loop .stop )
435+ 
436+         t  =  threading .Thread (target = run_in_thread )
437+         t .start ()
438+ 
439+         self .loop .run_forever ()
440+         t .join ()
441+         self .assertEqual (results , ['hello' ])
442+ 
443+     def  test_call_soon_threadsafe_handle_cancel_other_thread (self ):
444+         results  =  []
445+         ev  =  threading .Event ()
446+ 
447+         callback_finished  =  threading .Event ()
448+         def  callback (arg ):
449+             results .append (arg )
450+             callback_finished .set ()
451+             self .loop .stop ()
452+ 
453+         def  run_in_thread ():
454+             handle  =  self .loop .call_soon_threadsafe (callback , 'hello' )
455+             # handle can be cancelled from other thread if not started yet 
456+             self .assertIsInstance (handle , events ._ThreadSafeHandle )
457+             handle .cancel ()
458+             self .assertTrue (handle .cancelled ())
459+             self .assertFalse (callback_finished .is_set ())
460+             ev .set ()
461+             self .loop .call_soon_threadsafe (self .loop .stop )
462+ 
463+         # block the main loop until the callback is added and cancelled in the 
464+         # other thread 
465+         self .loop .call_soon (ev .wait )
466+         t  =  threading .Thread (target = run_in_thread )
467+         t .start ()
468+         self .loop .run_forever ()
469+         t .join ()
470+         self .assertEqual (results , [])
471+         self .assertFalse (callback_finished .is_set ())
472+ 
356473    def  test_call_soon_threadsafe_same_thread (self ):
357474        results  =  []
358475
0 commit comments