Skip to content

Commit 10a751b

Browse files
fix: address fork() bugs found in code review
Fix four issues identified by @VascoSch92: 1. Path-doubling: fork() was computing fork_persistence as base/FORK_HEX, but __init__→get_persistence_dir() appends the hex again, producing base/FORK_HEX/FORK_HEX. Fix: pass only the base directory and let __init__ append the hex. 2. Missing FIFOLock: fork() reads mutable state (events, stats, agent_state, activated_knowledge_skills) without acquiring the state lock, risking torn reads during concurrent run(). Fix: wrap all state reads in with self._state:. 3. Orphaned persistence dir: if _start_event_service raises after fork persistence is written to disk, the directory is never cleaned up. Fix: add try/except with safe_rmtree rollback. 4. No duplicate-ID check: client-supplied fork ID could clobber an active conversation. Fix: reject duplicate IDs with ValueError, surface as HTTP 409 Conflict at the router layer. Add regression tests: - test_fork_persistence_path_no_doubling: verifies fork dir is a sibling of source, not nested - test_fork_persisted_events_survive_reload: end-to-end persistence round-trip (close fork, reopen from disk, verify events) - test_fork_conversation_duplicate_id_returns_409: router returns 409 for duplicate fork IDs Co-authored-by: openhands <openhands@all-hands.dev>
1 parent 66a1707 commit 10a751b

File tree

5 files changed

+165
-51
lines changed

5 files changed

+165
-51
lines changed

openhands-agent-server/openhands/agent_server/conversation_router.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ async def condense_conversation(
400400
responses={
401401
201: {"description": "Forked conversation created"},
402402
404: {"description": "Source conversation not found"},
403+
409: {"description": "Fork ID already in use"},
403404
},
404405
status_code=status.HTTP_201_CREATED,
405406
)
@@ -414,15 +415,21 @@ async def fork_conversation(
414415
Calling ``run`` on the fork resumes from the copied state, meaning
415416
the agent has full event memory of the source conversation.
416417
"""
417-
info = await conversation_service.fork_conversation(
418-
conversation_id,
419-
fork_id=request.id,
420-
title=request.title,
421-
tags=request.tags if request.tags is not None else None,
422-
reset_metrics=request.reset_metrics,
423-
)
418+
try:
419+
info = await conversation_service.fork_conversation(
420+
conversation_id,
421+
fork_id=request.id,
422+
title=request.title,
423+
tags=request.tags if request.tags is not None else None,
424+
reset_metrics=request.reset_metrics,
425+
)
426+
except ValueError as exc:
427+
if "already exists" in str(exc):
428+
raise HTTPException(status.HTTP_409_CONFLICT, detail=str(exc)) from exc
429+
raise
424430
if info is None:
425431
raise HTTPException(
426-
status.HTTP_404_NOT_FOUND, detail="Source conversation not found"
432+
status.HTTP_404_NOT_FOUND,
433+
detail="Source conversation not found",
427434
)
428435
return info

openhands-agent-server/openhands/agent_server/conversation_service.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,10 +674,19 @@ async def fork_conversation(
674674
so the forked conversation is fully independent from the source.
675675
676676
Returns ``None`` when *source_id* does not exist.
677+
678+
Raises:
679+
ValueError: If *fork_id* is already taken by an active
680+
conversation.
677681
"""
678682
if self._event_services is None:
679683
raise ValueError("inactive_service")
680684

685+
# Reject duplicate fork IDs early to avoid clobbering an active
686+
# conversation or leaking an EventService reference.
687+
if fork_id is not None and fork_id in self._event_services:
688+
raise ValueError(f"Conversation with id {fork_id} already exists")
689+
681690
source_service = self._event_services.get(source_id)
682691
if source_service is None:
683692
return None
@@ -705,7 +714,14 @@ async def fork_conversation(
705714
agent=fork_agent,
706715
workspace=fork_workspace,
707716
)
708-
fork_event_service = await self._start_event_service(fork_stored)
717+
# If the service fails to start, clean up the orphaned persistence
718+
# directory so we don't leave stale state on disk.
719+
fork_dir = self.conversations_dir / fork_conv_id.hex
720+
try:
721+
fork_event_service = await self._start_event_service(fork_stored)
722+
except Exception:
723+
safe_rmtree(fork_dir)
724+
raise
709725

710726
state = await fork_event_service.get_state()
711727
return _compose_conversation_info_v1(fork_event_service.stored, state)

openhands-sdk/openhands/sdk/conversation/impl/local_conversation.py

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -348,54 +348,63 @@ def fork(
348348
self.agent.model_dump(context={"expose_secrets": True}),
349349
)
350350

351-
# Determine persistence_dir for the fork
352-
source_persistence = self._state.persistence_dir
353-
fork_persistence: str | None = None
354-
if source_persistence is not None:
355-
source_path = Path(source_persistence)
356-
fork_persistence = str(source_path.parent / fork_id.hex)
357-
358-
# Build the fork conversation (empty – no events yet)
359-
fork_conv = LocalConversation(
360-
agent=fork_agent,
361-
workspace=self.workspace,
362-
plugins=self._plugin_specs,
363-
persistence_dir=fork_persistence,
364-
conversation_id=fork_id,
365-
max_iteration_per_run=self.max_iteration_per_run,
366-
stuck_detection=self._stuck_detector is not None,
367-
visualizer=type(self._visualizer) if self._visualizer else None,
368-
delete_on_close=self.delete_on_close,
369-
tags=tags,
370-
)
351+
# Hold the state lock while reading mutable state from the source
352+
# conversation to avoid torn reads if run() is executing concurrently.
353+
with self._state:
354+
# Determine persistence_dir for the fork.
355+
# Pass the *base* directory only — __init__ calls
356+
# get_persistence_dir() which appends the conversation ID hex,
357+
# so we must not do that here.
358+
source_persistence = self._state.persistence_dir
359+
fork_persistence: str | None = None
360+
if source_persistence is not None:
361+
source_path = Path(source_persistence)
362+
fork_persistence = str(source_path.parent)
363+
364+
# Build the fork conversation (empty – no events yet)
365+
fork_conv = LocalConversation(
366+
agent=fork_agent,
367+
workspace=self.workspace,
368+
plugins=self._plugin_specs,
369+
persistence_dir=fork_persistence,
370+
conversation_id=fork_id,
371+
max_iteration_per_run=self.max_iteration_per_run,
372+
stuck_detection=self._stuck_detector is not None,
373+
visualizer=type(self._visualizer) if self._visualizer else None,
374+
delete_on_close=self.delete_on_close,
375+
tags=tags,
376+
)
371377

372-
# Deep-copy events from source → fork so the source stays immutable.
373-
for event in self._state.events:
374-
fork_conv._state.events.append(event.model_copy(deep=True))
375-
376-
# Copy runtime state that accumulated during the source conversation.
377-
# activated_knowledge_skills is list[str] – strings are immutable so a
378-
# shallow list copy is sufficient. agent_state can hold arbitrary
379-
# mutable values, so deep-copy it.
380-
fork_conv._state.activated_knowledge_skills = list(
381-
self._state.activated_knowledge_skills
382-
)
383-
fork_conv._state.agent_state = copy.deepcopy(self._state.agent_state)
378+
# Deep-copy events from source → fork so the source stays
379+
# immutable.
380+
for event in self._state.events:
381+
fork_conv._state.events.append(event.model_copy(deep=True))
382+
383+
# Copy runtime state that accumulated during the source
384+
# conversation. activated_knowledge_skills is list[str] – strings
385+
# are immutable so a shallow list copy is sufficient.
386+
# agent_state can hold arbitrary mutable values, so deep-copy it.
387+
fork_conv._state.activated_knowledge_skills = list(
388+
self._state.activated_knowledge_skills
389+
)
390+
fork_conv._state.agent_state = copy.deepcopy(self._state.agent_state)
391+
392+
# Copy title via tags if provided
393+
if title is not None:
394+
fork_conv._state.tags = {
395+
**fork_conv._state.tags,
396+
"title": title,
397+
}
384398

385-
# Copy title via tags if provided
386-
if title is not None:
387-
fork_conv._state.tags = {
388-
**fork_conv._state.tags,
389-
"title": title,
390-
}
399+
# Reset or copy metrics
400+
if not reset_metrics:
401+
fork_conv._state.stats = self._state.stats.model_copy(deep=True)
391402

392-
# Reset or copy metrics
393-
if not reset_metrics:
394-
fork_conv._state.stats = self._state.stats.model_copy(deep=True)
403+
event_count = len(self._state.events)
395404

396405
logger.info(
397406
f"Forked conversation {self.id}{fork_id} "
398-
f"({len(self._state.events)} events copied, "
407+
f"({event_count} events copied, "
399408
f"reset_metrics={reset_metrics})"
400409
)
401410
return fork_conv

tests/agent_server/test_conversation_router.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,3 +1774,26 @@ def test_fork_conversation_not_found(
17741774
assert response.status_code == 404
17751775
finally:
17761776
client.app.dependency_overrides.clear()
1777+
1778+
1779+
def test_fork_conversation_duplicate_id_returns_409(
1780+
client, mock_conversation_service, sample_conversation_id
1781+
):
1782+
"""Test fork returns 409 when the requested fork ID already exists."""
1783+
mock_conversation_service.fork_conversation.side_effect = ValueError(
1784+
f"Conversation with id {sample_conversation_id} already exists"
1785+
)
1786+
1787+
client.app.dependency_overrides[get_conversation_service] = (
1788+
lambda: mock_conversation_service
1789+
)
1790+
1791+
try:
1792+
response = client.post(
1793+
f"/api/conversations/{sample_conversation_id}/fork",
1794+
json={"id": str(sample_conversation_id)},
1795+
)
1796+
1797+
assert response.status_code == 409
1798+
finally:
1799+
client.app.dependency_overrides.clear()

tests/sdk/conversation/local/test_fork.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import tempfile
44
import uuid
5+
from pathlib import Path
56

67
import pytest
78
from pydantic import SecretStr
@@ -191,3 +192,61 @@ def test_fork_event_deep_copy_isolation():
191192
assert fork_evt.llm_message.content[0].text == "original" # type: ignore[union-attr]
192193
fork_evt.llm_message.content[0].text = "mutated" # type: ignore[union-attr]
193194
assert src_evt.llm_message.content[0].text == "original" # type: ignore[union-attr]
195+
196+
197+
def test_fork_persistence_path_no_doubling():
198+
"""Fork persistence dir must be a sibling of source, not nested inside it.
199+
200+
Regression test: fork() previously computed the persistence path with
201+
the conversation hex appended, but __init__ also appends it via
202+
get_persistence_dir(), leading to /base/FORK_HEX/FORK_HEX.
203+
"""
204+
with tempfile.TemporaryDirectory() as tmpdir:
205+
src = Conversation(agent=_agent(), persistence_dir=tmpdir, workspace=tmpdir)
206+
fork = src.fork()
207+
208+
assert src._state.persistence_dir is not None
209+
assert fork._state.persistence_dir is not None
210+
src_path = Path(src._state.persistence_dir)
211+
fork_path = Path(fork._state.persistence_dir)
212+
213+
# Both should live directly under the same base directory
214+
assert src_path.parent == fork_path.parent
215+
# The fork dir should be <base>/<fork_id_hex>, not doubled
216+
assert fork_path.name == fork.id.hex
217+
218+
219+
def test_fork_persisted_events_survive_reload():
220+
"""Events persisted by fork() should be loadable from the fork dir.
221+
222+
This validates the path-doubling fix end-to-end: if the fork wrote
223+
events to the wrong directory, resuming from the correct path would
224+
see zero events.
225+
"""
226+
# Event IDs must be hex+dash, ≥8 chars to match EVENT_NAME_RE.
227+
evt_id_1 = uuid.uuid4().hex
228+
evt_id_2 = uuid.uuid4().hex
229+
230+
with tempfile.TemporaryDirectory() as tmpdir:
231+
src = Conversation(agent=_agent(), persistence_dir=tmpdir, workspace=tmpdir)
232+
src.state.events.append(_msg(evt_id_1, "hello"))
233+
src.state.events.append(_msg(evt_id_2, "world"))
234+
235+
fork = src.fork()
236+
fork_id = fork.id
237+
238+
# The fork should have the events in-memory
239+
assert len(fork.state.events) == 2
240+
241+
# Close the fork to flush persistence, then reopen from disk
242+
fork.close()
243+
244+
resumed = Conversation(
245+
agent=_agent(),
246+
persistence_dir=tmpdir,
247+
workspace=tmpdir,
248+
conversation_id=fork_id,
249+
)
250+
resumed_ids = [e.id for e in resumed.state.events]
251+
assert evt_id_1 in resumed_ids
252+
assert evt_id_2 in resumed_ids

0 commit comments

Comments
 (0)