@@ -353,6 +353,124 @@ def run_in_thread():
353353 t .join ()
354354 self .assertEqual (results , ['hello' , 'world' ])
355355
356+ def test_call_soon_threadsafe_handle_block_check_cancelled (self ):
357+ results = []
358+
359+ callback_started = threading .Event ()
360+ callback_finished = threading .Event ()
361+ def callback (arg ):
362+ callback_started .set ()
363+ results .append (arg )
364+ time .sleep (1 )
365+ callback_finished .set ()
366+
367+ def run_in_thread ():
368+ handle = self .loop .call_soon_threadsafe (callback , 'hello' )
369+ self .assertIsInstance (handle , events ._ThreadSafeHandle )
370+ callback_started .wait ()
371+ # callback started so it should block checking for cancellation
372+ # until it finishes
373+ self .assertFalse (handle .cancelled ())
374+ self .assertTrue (callback_finished .is_set ())
375+ self .loop .call_soon_threadsafe (self .loop .stop )
376+
377+ t = threading .Thread (target = run_in_thread )
378+ t .start ()
379+
380+ self .loop .run_forever ()
381+ t .join ()
382+ self .assertEqual (results , ['hello' ])
383+
384+ def test_call_soon_threadsafe_handle_block_cancellation (self ):
385+ results = []
386+
387+ callback_started = threading .Event ()
388+ callback_finished = threading .Event ()
389+ def callback (arg ):
390+ callback_started .set ()
391+ results .append (arg )
392+ time .sleep (1 )
393+ callback_finished .set ()
394+
395+ def run_in_thread ():
396+ handle = self .loop .call_soon_threadsafe (callback , 'hello' )
397+ self .assertIsInstance (handle , events ._ThreadSafeHandle )
398+ callback_started .wait ()
399+ # callback started so it cannot be cancelled from other thread until
400+ # it finishes
401+ handle .cancel ()
402+ self .assertTrue (callback_finished .is_set ())
403+ self .loop .call_soon_threadsafe (self .loop .stop )
404+
405+ t = threading .Thread (target = run_in_thread )
406+ t .start ()
407+
408+ self .loop .run_forever ()
409+ t .join ()
410+ self .assertEqual (results , ['hello' ])
411+
412+ def test_call_soon_threadsafe_handle_cancel_same_thread (self ):
413+ results = []
414+ callback_started = threading .Event ()
415+ callback_finished = threading .Event ()
416+
417+ fut = concurrent .futures .Future ()
418+ def callback (arg ):
419+ callback_started .set ()
420+ handle = fut .result ()
421+ handle .cancel ()
422+ results .append (arg )
423+ callback_finished .set ()
424+ self .loop .stop ()
425+
426+ def run_in_thread ():
427+ handle = self .loop .call_soon_threadsafe (callback , 'hello' )
428+ fut .set_result (handle )
429+ self .assertIsInstance (handle , events ._ThreadSafeHandle )
430+ callback_started .wait ()
431+ # callback cancels itself from same thread so it has no effect
432+ # it runs to completion
433+ self .assertTrue (handle .cancelled ())
434+ self .assertTrue (callback_finished .is_set ())
435+ self .loop .call_soon_threadsafe (self .loop .stop )
436+
437+ t = threading .Thread (target = run_in_thread )
438+ t .start ()
439+
440+ self .loop .run_forever ()
441+ t .join ()
442+ self .assertEqual (results , ['hello' ])
443+
444+ def test_call_soon_threadsafe_handle_cancel_other_thread (self ):
445+ results = []
446+ ev = threading .Event ()
447+
448+ callback_finished = threading .Event ()
449+ def callback (arg ):
450+ results .append (arg )
451+ callback_finished .set ()
452+ self .loop .stop ()
453+
454+ def run_in_thread ():
455+ handle = self .loop .call_soon_threadsafe (callback , 'hello' )
456+ # handle can be cancelled from other thread if not started yet
457+ self .assertIsInstance (handle , events ._ThreadSafeHandle )
458+ handle .cancel ()
459+ self .assertTrue (handle .cancelled ())
460+ self .assertFalse (callback_finished .is_set ())
461+ ev .set ()
462+ self .loop .call_soon_threadsafe (self .loop .stop )
463+
464+ # block the main loop until the callback is added and cancelled in the
465+ # other thread
466+ self .loop .call_soon (ev .wait )
467+ t = threading .Thread (target = run_in_thread )
468+ t .start ()
469+ self .loop .run_forever ()
470+ t .join ()
471+ self .assertEqual (results , [])
472+ self .assertFalse (callback_finished .is_set ())
473+
356474 def test_call_soon_threadsafe_same_thread (self ):
357475 results = []
358476
0 commit comments