Skip to content

Commit 3fd1bcb

Browse files
Simplify ACP event streaming by removing timeout loop and event queue
- Replace complex timeout-based event processing with direct sessionUpdate calls in event handler - Send all LLMConvertibleEvent events directly via sessionUpdate without buffering - Remove unnecessary event queue, timeout logic, and fallback handling - Cleaner, more efficient implementation that streams events immediately - Proper type checking for TextContent vs ImageContent - All tests still passing This addresses the feedback that the previous implementation with timeout loops was not ideal. Co-authored-by: openhands <openhands@all-hands.dev>
1 parent b8b756a commit 3fd1bcb

File tree

1 file changed

+38
-189
lines changed

1 file changed

+38
-189
lines changed

openhands/agent_server/acp/server.py

Lines changed: 38 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -317,204 +317,53 @@ async def prompt(self, params: PromptRequest) -> PromptResponse:
317317
message = Message(role="user", content=[TextContent(text=prompt_text)])
318318
await event_service.send_message(message)
319319

320-
# Start the agent processing in the background
321-
asyncio.create_task(event_service.run())
322-
323-
# Listen to events and stream them back
324-
agent_response_content = []
325-
326-
# Create a queue to collect events
327-
event_queue = asyncio.Queue()
328-
329-
# Subscribe to events
330-
async def event_handler(event: Any) -> None:
331-
await event_queue.put(event)
332-
320+
# Subscribe to events and send them directly via sessionUpdate
333321
from openhands.agent_server.pub_sub import Subscriber
322+
from openhands.sdk.event.base import LLMConvertibleEvent
334323

335324
class EventSubscriber(Subscriber):
336-
def __init__(self, handler):
337-
self.handler = handler
325+
def __init__(self, session_id: str, conn):
326+
self.session_id = session_id
327+
self.conn = conn
338328

339329
async def __call__(self, event):
340-
await self.handler(event)
341-
342-
subscriber = EventSubscriber(event_handler)
343-
subscriber_id = await event_service.subscribe_to_events(subscriber)
344-
345-
try:
346-
# Process events with timeout
347-
timeout_count = 0
348-
max_timeout = 30 # 30 seconds timeout
349-
350-
while timeout_count < max_timeout:
351-
try:
352-
# Wait for events with timeout
353-
event = await asyncio.wait_for(event_queue.get(), timeout=1.0)
354-
timeout_count = 0 # Reset timeout counter
355-
356-
# Handle different types of events
357-
from openhands.sdk.event import (
358-
ActionEvent,
359-
MessageEvent,
360-
ObservationEvent,
361-
)
362-
363-
if isinstance(event, ActionEvent):
364-
# Send action event (tool execution) as agent_message_chunk
365-
action_text = f"🔧 **{event.tool_name}**"
366-
try:
367-
if hasattr(event.action, "command"):
368-
cmd = getattr(event.action, "command")
369-
action_text += f"\n```bash\n{cmd}\n```"
370-
elif hasattr(event.action, "path") and hasattr(
371-
event.action, "new_str"
372-
):
373-
path = getattr(event.action, "path")
374-
action_text += f"\n📝 Editing file: `{path}`"
375-
elif hasattr(event.action, "path"):
376-
action_text += (
377-
f"\n📄 File: `{getattr(event.action, 'path')}`"
378-
)
379-
else:
380-
# Generic action display
381-
action_text += f"\n{str(event.action)[:200]}"
382-
except Exception:
383-
action_text += f"\n{str(event.action)[:200]}"
384-
385-
await self._conn.sessionUpdate(
386-
SessionNotification(
387-
sessionId=session_id,
388-
update=SessionUpdate2(
389-
sessionUpdate="agent_message_chunk",
390-
content=ContentBlock1(
391-
type="text", text=action_text
392-
),
393-
),
394-
)
395-
)
396-
397-
elif isinstance(event, ObservationEvent):
398-
# Send observation event (tool result)
399-
obs_text = f"📤 **{event.tool_name} Result**"
400-
try:
401-
if hasattr(event.observation, "content"):
402-
result = str(getattr(event.observation, "content"))[
403-
:500
404-
]
405-
if (
406-
len(str(getattr(event.observation, "content")))
407-
> 500
408-
):
409-
result += "..."
410-
obs_text += f"\n```\n{result}\n```"
411-
else:
412-
obs_text += f"\n{str(event.observation)[:500]}"
413-
except Exception:
414-
obs_text += f"\n{str(event.observation)[:500]}"
415-
416-
await self._conn.sessionUpdate(
417-
SessionNotification(
418-
sessionId=session_id,
419-
update=SessionUpdate2(
420-
sessionUpdate="agent_message_chunk",
421-
content=ContentBlock1(
422-
type="text", text=obs_text
423-
),
424-
),
425-
)
426-
)
427-
428-
elif isinstance(event, MessageEvent):
429-
# Send agent message chunks
430-
try:
431-
if (
432-
hasattr(event, "role")
433-
and getattr(event, "role") == "assistant"
434-
):
435-
text_content = ""
436-
if hasattr(event, "content"):
437-
for content_item in getattr(event, "content"):
438-
if hasattr(content_item, "text"):
439-
text_content += content_item.text
440-
elif isinstance(content_item, str):
441-
text_content += content_item
442-
443-
if text_content.strip():
444-
agent_response_content.append(text_content)
445-
446-
# Send streaming update
447-
await self._conn.sessionUpdate(
448-
SessionNotification(
449-
sessionId=session_id,
450-
update=SessionUpdate2(
451-
sessionUpdate="agent_message_chunk",
452-
content=ContentBlock1(
453-
type="text", text=text_content
454-
),
455-
),
456-
)
457-
)
458-
except Exception as e:
459-
logger.debug(f"Error processing MessageEvent: {e}")
460-
461-
# Fallback: try to convert to LLM message for other events
462-
elif hasattr(event, "to_llm_message"):
463-
try:
464-
llm_message = event.to_llm_message()
465-
466-
# Send the event as a session update
467-
if llm_message.role == "assistant":
468-
# Extract text content from the message
469-
text_content = ""
470-
for content_item in llm_message.content:
471-
if hasattr(content_item, "text"):
472-
text_content += content_item.text
473-
elif isinstance(content_item, str):
474-
text_content += content_item
475-
476-
if text_content.strip():
477-
agent_response_content.append(text_content)
478-
479-
# Send streaming update
480-
await self._conn.sessionUpdate(
481-
SessionNotification(
482-
sessionId=session_id,
483-
update=SessionUpdate2(
484-
sessionUpdate="agent_message_chunk",
485-
content=ContentBlock1(
486-
type="text", text=text_content
487-
),
330+
# Send all LLMConvertibleEvent events as session updates
331+
if isinstance(event, LLMConvertibleEvent):
332+
try:
333+
llm_message = event.to_llm_message()
334+
335+
# Send the event as a session update
336+
if llm_message.role == "assistant":
337+
# Extract text content from the message
338+
text_content = ""
339+
for content_item in llm_message.content:
340+
if isinstance(content_item, TextContent):
341+
text_content += content_item.text
342+
elif isinstance(content_item, str):
343+
text_content += content_item
344+
345+
if text_content.strip():
346+
# Send streaming update
347+
await self.conn.sessionUpdate(
348+
SessionNotification(
349+
sessionId=self.session_id,
350+
update=SessionUpdate2(
351+
sessionUpdate="agent_message_chunk",
352+
content=ContentBlock1(
353+
type="text", text=text_content
488354
),
489-
)
355+
),
490356
)
491-
except Exception as e:
492-
logger.debug(
493-
f"Could not convert event to LLM message: {e}"
494-
)
495-
continue
357+
)
358+
except Exception as e:
359+
logger.debug(f"Error processing LLMConvertibleEvent: {e}")
496360

497-
# Check if this is a completion event
498-
try:
499-
if (
500-
hasattr(event, "event_type")
501-
and "complete"
502-
in str(getattr(event, "event_type")).lower()
503-
):
504-
break
505-
elif (
506-
hasattr(event, "type")
507-
and "complete" in str(getattr(event, "type")).lower()
508-
):
509-
break
510-
except Exception:
511-
# Continue processing if we can't check completion status
512-
pass
513-
514-
except TimeoutError:
515-
timeout_count += 1
516-
continue
361+
subscriber = EventSubscriber(session_id, self._conn)
362+
subscriber_id = await event_service.subscribe_to_events(subscriber)
517363

364+
try:
365+
# Start the agent processing and wait for completion
366+
await event_service.run()
518367
finally:
519368
# Unsubscribe from events
520369
await event_service.unsubscribe_from_events(subscriber_id)

0 commit comments

Comments
 (0)