@@ -1296,7 +1296,7 @@ def test_debug_namespace(tmpdir):
1296
1296
break
1297
1297
1298
1298
1299
- def test_interrupt ():
1299
+ def test_interrupt_short_loop ():
1300
1300
"""
1301
1301
Test that the kernel can be interrupted by calling a comm handler.
1302
1302
"""
@@ -1321,17 +1321,35 @@ def test_interrupt():
1321
1321
kernel_comm .remote_call ().raise_interrupt_signal ()
1322
1322
# Wait for shell message
1323
1323
while True :
1324
- assert time .time () - t0 < 5
1324
+ delta = time .time () - t0
1325
+ assert delta < 5
1325
1326
msg = client .get_shell_msg (timeout = TIMEOUT )
1326
1327
if msg ["parent_header" ].get ("msg_id" ) != msg_id :
1327
1328
# not from my request
1328
1329
continue
1329
1330
break
1330
- assert time .time () - t0 < 5
1331
+ delta = time .time () - t0
1332
+ assert delta < 5 , (
1333
+ "10 seconds long call should have been interrupted, so the "
1334
+ "interrupt signal was likely mishandled"
1335
+ )
1336
+
1337
+
1338
+ @pytest .mark .skipif (os .name == "nt" , reason = "Windows doesn't do 'interrupting sleep'" )
1339
+ def test_interrupt_long_sleep ():
1340
+ # Command to start the kernel
1341
+ cmd = "from spyder_kernels.console import start; start.main()"
1342
+ with setup_kernel (cmd ) as client :
1343
+ kernel_comm = CommBase ()
1344
+
1345
+ # Create new comm and send the highest protocol
1346
+ comm = Comm (kernel_comm ._comm_name , client )
1347
+ comm .open (data = {})
1348
+ comm ._send_channel = client .control_channel
1349
+ kernel_comm ._register_comm (comm )
1350
+
1351
+ client .execute_interactive ("import time" , timeout = TIMEOUT )
1331
1352
1332
- if os .name == 'nt' :
1333
- # Windows doesn't do "interrupting sleep"
1334
- return
1335
1353
1336
1354
# Try interrupting sleep
1337
1355
t0 = time .time ()
0 commit comments