@@ -125,6 +125,58 @@ def start_kernel_with_retry(session, *, retries=5, delay=1.0, **kwargs):
125125 raise last_err
126126
127127
128+ def shutdown_and_start_kernel (session , * , env_source , retries = 5 , delay = 1.0 , ** kwargs ):
129+ """Shut down any auto-launched kernel, then start with a specific env_source.
130+
131+ The daemon auto-launches a prewarmed kernel when create_notebook() is called.
132+ This races with explicit start_kernel(env_source=...) — if the auto-launched
133+ kernel wins, start_kernel returns KernelAlreadyRunning with the wrong
134+ env_source (not an error, so start_kernel_with_retry won't catch it).
135+
136+ This helper:
137+ 1. Shuts down the auto-launched kernel (retrying to handle the race where
138+ shutdown arrives before auto-launch acquires the kernel lock)
139+ 2. Starts the kernel with the desired env_source
140+ 3. Verifies env_source matches; if not, shuts down and retries
141+ """
142+ # Phase 1: Reliably shut down the auto-launched kernel.
143+ # The auto-launch task runs in the background and may not have acquired
144+ # the kernel lock yet when our first shutdown arrives (returning NoKernel).
145+ # Retry a few times with short delays to catch the kernel after it starts.
146+ for _ in range (3 ):
147+ try :
148+ session .shutdown_kernel ()
149+ except Exception :
150+ pass
151+ time .sleep (0.5 )
152+
153+ # Phase 2: Start kernel and verify env_source matches.
154+ last_err : Exception = Exception ("max retries exceeded" )
155+ for attempt in range (retries ):
156+ try :
157+ session .start_kernel (env_source = env_source , ** kwargs )
158+ except runtimed .RuntimedError as e :
159+ last_err = e
160+ if attempt < retries - 1 :
161+ time .sleep (delay )
162+ continue
163+
164+ # start_kernel succeeded — check if we got the right env_source.
165+ # KernelAlreadyRunning silently returns the existing kernel's env_source.
166+ if session .env_source == env_source :
167+ return
168+
169+ # Wrong env_source: a stale auto-launched kernel is still running.
170+ # Shut it down and retry.
171+ try :
172+ session .shutdown_kernel ()
173+ except Exception :
174+ pass
175+ time .sleep (delay )
176+
177+ raise last_err
178+
179+
128180async def async_start_kernel_with_retry (session , * , retries = 5 , delay = 1.0 , ** kwargs ):
129181 """Async retry wrapper for start_kernel (tolerates connection timeouts on CI)."""
130182 last_err : Exception = Exception ("max retries exceeded" )
@@ -902,11 +954,6 @@ class TestOutputHandling:
902954 execution stops when an error is raised.
903955 """
904956
905- @pytest .mark .xfail (
906- reason = "Sync race: create_cell + execute_cell in quick succession may execute "
907- "before source is synced to daemon. See #875 discussion." ,
908- strict = False ,
909- )
910957 def test_output_types_and_error_stops_execution (self , session ):
911958 """Test stream, display, error outputs and verify error stops execution.
912959
@@ -931,6 +978,11 @@ def test_output_types_and_error_stops_execution(self, session):
931978 cell3 = session .create_cell ('raise ValueError("better see this")' )
932979 cell4 = session .create_cell ('print("this better not run")' )
933980
981+ # Let CRDT sync propagate cell sources to the daemon before executing.
982+ # Under broadcast pressure (kernel warmup, runtime state updates),
983+ # confirm_sync's best-effort fallback can fire prematurely.
984+ time .sleep (0.5 )
985+
934986 # Execute cell 2: display data
935987 result2 = session .execute_cell (cell2 )
936988 assert result2 .success , f"Cell 2 should succeed: { result2 .error } "
@@ -1148,8 +1200,8 @@ def test_uv_inline_deps_trusted(self, session):
11481200 """
11491201 _set_python_kernelspec (session , uv_deps = ["requests" ])
11501202
1151- # Retry: metadata may not have synced to the daemon's Automerge doc yet
1152- start_kernel_with_retry (session , kernel_type = "python" , env_source = "uv:inline" )
1203+ # Shut down the auto-launched prewarmed kernel, then start with uv:inline
1204+ shutdown_and_start_kernel (session , kernel_type = "python" , env_source = "uv:inline" )
11531205
11541206 assert session .env_source == "uv:inline"
11551207
@@ -1163,8 +1215,8 @@ def test_uv_inline_deps_env_has_python(self, session):
11631215 """UV inline env actually has a working Python with the declared deps."""
11641216 _set_python_kernelspec (session , uv_deps = ["requests" ])
11651217
1166- # Retry: metadata may not have synced to the daemon's Automerge doc yet
1167- start_kernel_with_retry (session , kernel_type = "python" , env_source = "uv:inline" )
1218+ # Shut down the auto-launched prewarmed kernel, then start with uv:inline
1219+ shutdown_and_start_kernel (session , kernel_type = "python" , env_source = "uv:inline" )
11681220
11691221 # sys.prefix should point to a venv, not the system Python
11701222 result = session .run ("import sys; print(sys.prefix)" )
@@ -1265,24 +1317,12 @@ def conda_inline_session(self, daemon_process):
12651317 client = runtimed .Client (socket_path = str (socket_path )) if socket_path else runtimed .Client ()
12661318 sess = client .create_notebook (runtime = "python" )
12671319
1268- # Shutdown the auto-launched Python kernel so we can re-launch
1269- # with conda:inline env_source (the daemon returns
1270- # KernelAlreadyRunning if a kernel is already up).
1271- try :
1272- sess .shutdown_kernel ()
1273- except Exception :
1274- pass
1275-
12761320 # Set up conda inline deps metadata using typed API
12771321 _set_python_kernelspec (sess , conda_deps = ["filelock" ])
12781322
1279- # Extra delay: conda:inline metadata must propagate to the daemon's
1280- # Automerge doc before start_kernel reads it. The retry helper covers
1281- # transient failures but the class-scoped fixture only runs once.
1282- time .sleep (2.0 )
1283-
1284- # Start kernel once for all tests in class (longer retry for conda env creation)
1285- start_kernel_with_retry (
1323+ # Shut down the auto-launched prewarmed kernel and start with conda:inline.
1324+ # Uses longer retries because conda env creation can be slow.
1325+ shutdown_and_start_kernel (
12861326 sess ,
12871327 kernel_type = "python" ,
12881328 env_source = "conda:inline" ,
@@ -1605,6 +1645,13 @@ async def test_async_execute_cell_reads_from_document(self, async_session):
16051645 await async_start_kernel_with_retry (async_session )
16061646
16071647 cell_id = await async_session .create_cell ("result = 2 + 2; print(result)" )
1648+
1649+ # Brief pause for CRDT sync: under broadcast pressure (kernel warmup,
1650+ # runtime state updates), confirm_sync's 5-round budget can be consumed
1651+ # by non-sync frames, causing the best-effort fallback to fire before
1652+ # the cell source propagates to the daemon.
1653+ await asyncio .sleep (0.5 )
1654+
16081655 result = await async_session .execute_cell (cell_id )
16091656
16101657 assert result .success
@@ -1806,6 +1853,7 @@ async def test_async_syntax_error(self, async_session):
18061853 assert warmup_result .success
18071854
18081855 cell_id = await async_session .create_cell ("if True print('broken')" )
1856+ await asyncio .sleep (0.5 )
18091857 result = await async_session .execute_cell (cell_id )
18101858
18111859 assert not result .success
@@ -1861,6 +1909,7 @@ async def test_stream_execute_yields_events(self, async_session):
18611909 await async_start_kernel_with_retry (async_session )
18621910
18631911 cell_id = await async_session .create_cell ("for i in range(3): print(f'line {i}')" )
1912+ await asyncio .sleep (0.5 )
18641913
18651914 events = []
18661915 async for event in await async_session .stream_execute (cell_id ):
@@ -1883,6 +1932,7 @@ async def test_stream_execute_has_output_events(self, async_session):
18831932 await async_start_kernel_with_retry (async_session )
18841933
18851934 cell_id = await async_session .create_cell ("print('first'); print('second')" )
1935+ await asyncio .sleep (0.5 )
18861936
18871937 output_events = []
18881938 async for event in await async_session .stream_execute (cell_id ):
@@ -1898,25 +1948,28 @@ async def test_stream_execute_has_output_events(self, async_session):
18981948
18991949 @pytest .mark .asyncio
19001950 async def test_stream_execute_error_in_output (self , async_session ):
1901- """Execution errors are captured in cell outputs (document state) .
1951+ """stream_execute() captures execution errors as output events .
19021952
1903- We verify via the document (get_cell_outputs) rather than relying
1904- on broadcast events, which can be missed under load. The document
1905- is the source of truth.
1953+ Python errors (ValueError, etc.) are broadcast as Output events
1954+ with output_type="error" and ename/evalue/traceback fields.
1955+ Using stream_execute avoids the CRDT sync race that execute_cell
1956+ has — broadcasts arrive in real-time.
19061957 """
19071958 await async_start_kernel_with_retry (async_session )
19081959
19091960 cell_id = await async_session .create_cell ("raise ValueError('test error')" )
1961+ await asyncio .sleep (0.5 )
19101962
1911- # Execute and let it complete
1912- result = await async_session .execute_cell (cell_id )
1963+ output_events = []
1964+ async for event in await async_session .stream_execute (cell_id ):
1965+ if event .event_type == "output" :
1966+ output_events .append (event )
19131967
1914- # The document should have the error output
1915- assert not result .success , "raise ValueError should fail"
1916- error_outputs = [o for o in result .outputs if o .output_type == "error" ]
1917- assert len (error_outputs ) >= 1 , "Expected error output in cell"
1918- assert error_outputs [0 ].ename == "ValueError"
1919- assert "test error" in (error_outputs [0 ].evalue or "" )
1968+ # Should have an error output
1969+ error_outputs = [e for e in output_events if e .output and e .output .output_type == "error" ]
1970+ assert len (error_outputs ) >= 1 , "Expected error output event from ValueError"
1971+ assert error_outputs [0 ].output .ename == "ValueError"
1972+ assert "test error" in (error_outputs [0 ].output .evalue or "" )
19201973
19211974
19221975# ============================================================================
0 commit comments