Skip to content

Commit 612d50a

Browse files
authored
fix: agent responses missing in UI due to SSE race condition (#1082)
* fix: add fallback bubble when final response arrives before status updates When an agent responds via A2A, status_update and final_response events travel on different broker topics with no ordering guarantee. If the final_response arrives first, the ChatProvider has no message bubble to mark complete and the user sees no response. The final task response already contains the complete text in result.status.message.parts. This adds a fallback that creates a message bubble from those parts when no bubble was created from status updates. * fix: defer SSE close to context cleanup to prevent dropping status updates The gateway's _send_final_response_to_external closed SSE connections immediately in its finally block, before context cleanup. Any status updates still queued in internal_event_queue would be dropped because the SSE connection was already closed. Move close_all_for_task to a new _close_external_connections hook called during context cleanup, after the final event is fully processed. This ensures pending status updates in the queue can still be delivered via SSE before the connection closes. * fix: close SSE connections in error paths of _handle_agent_event Two error paths in _handle_agent_event call _send_error_to_external then remove_context directly, bypassing the _close_external_connections hook. This leaks SSE connections when these error paths are hit. * ci: retrigger pipeline * test: verify SSE connection cleanup in _handle_agent_event error paths Tests that _close_external_connections is called after _send_error_to_external and before remove_context in both error paths. Uses real TaskContextManager with ordering assertions to catch the exact bug where connections leaked.
1 parent 2cc2563 commit 612d50a

File tree

4 files changed

+327
-12
lines changed

4 files changed

+327
-12
lines changed

client/webui/frontend/src/lib/providers/ChatProvider.tsx

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,6 +1510,27 @@ export const ChatProvider: React.FC<ChatProviderProps> = ({ children }) => {
15101510
isComplete: true,
15111511
metadata: { ...newMessages[taskMessageIndex].metadata, lastProcessedEventSequence: currentEventSequence },
15121512
};
1513+
} else if (result.kind === "task" && result.status?.state !== "failed" && result.status?.message?.parts) {
1514+
// Fallback: the final response arrived before any status updates
1515+
// (race condition between response and status broker topics).
1516+
// Create a bubble from the final response's content.
1517+
const fallbackParts = (result.status.message.parts as PartFE[]).filter(
1518+
(p: PartFE) => p.kind === "text" || p.kind === "file"
1519+
);
1520+
if (fallbackParts.length > 0) {
1521+
newMessages.push({
1522+
role: "agent",
1523+
parts: fallbackParts,
1524+
taskId: currentTaskIdFromResult,
1525+
isUser: false,
1526+
isComplete: true,
1527+
metadata: {
1528+
messageId: rpcResponse.id?.toString() || `msg-${v4()}`,
1529+
sessionId: (result as unknown as { contextId?: string }).contextId || sessionId,
1530+
lastProcessedEventSequence: currentEventSequence,
1531+
},
1532+
});
1533+
}
15131534
}
15141535
}
15151536

src/solace_agent_mesh/gateway/base/component.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1982,10 +1982,11 @@ async def _process_parsed_a2a_event(
19821982

19831983
if is_truly_final_event_for_context_cleanup:
19841984
log.info(
1985-
"%s Truly final event processed for task %s. Removing context.",
1985+
"%s Truly final event processed for task %s. Closing connections and removing context.",
19861986
log_id_prefix,
19871987
a2a_task_id,
19881988
)
1989+
await self._close_external_connections(external_request_context)
19891990
self.task_context_manager.remove_context(a2a_task_id)
19901991
self.task_context_manager.remove_context(f"{a2a_task_id}_stream_buffer")
19911992

@@ -2072,6 +2073,7 @@ async def _handle_agent_event(
20722073
code=-32000, message="Invalid event structure received from agent."
20732074
)
20742075
await self._send_error_to_external(external_request_context, generic_error)
2076+
await self._close_external_connections(external_request_context)
20752077
self.task_context_manager.remove_context(task_id_from_topic)
20762078
self.task_context_manager.remove_context(
20772079
f"{task_id_from_topic}_stream_buffer"
@@ -2097,6 +2099,7 @@ async def _handle_agent_event(
20972099
code=-32000, message=f"Gateway processing error: {e}"
20982100
)
20992101
await self._send_error_to_external(external_request_context, error_obj)
2102+
await self._close_external_connections(external_request_context)
21002103
self.task_context_manager.remove_context(task_id_from_topic)
21012104
self.task_context_manager.remove_context(
21022105
f"{task_id_from_topic}_stream_buffer"
@@ -2333,6 +2336,17 @@ async def _send_error_to_external(
23332336
) -> None:
23342337
pass
23352338

2339+
async def _close_external_connections(
2340+
self, external_request_context: Dict[str, Any]
2341+
) -> None:
2342+
"""Close external connections (e.g., SSE) during context cleanup.
2343+
2344+
Called after the final event is fully processed, ensuring any
2345+
pending status updates are delivered before the connection closes.
2346+
Subclasses should override to perform transport-specific cleanup.
2347+
"""
2348+
pass
2349+
23362350
def _detect_gateway_type(self) -> str:
23372351
"""Auto-detect gateway type from component class or configuration."""
23382352
configured_type = self.get_config("gateway_type")

src/solace_agent_mesh/gateway/http_sse/component.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,14 +2349,6 @@ async def _send_final_response_to_external(
23492349
a2a_task_id,
23502350
e,
23512351
)
2352-
finally:
2353-
await self.sse_manager.close_all_for_task(sse_task_id)
2354-
log.info(
2355-
"%s Closed SSE connections for SSE Task ID %s.",
2356-
log_id_prefix,
2357-
sse_task_id,
2358-
)
2359-
23602352

23612353
async def _send_error_to_external(
23622354
self, external_request_context: dict[str, Any], error_data: JSONRPCError
@@ -2403,10 +2395,19 @@ async def _send_error_to_external(
24032395
sse_task_id,
24042396
e,
24052397
)
2406-
finally:
2398+
2399+
async def _close_external_connections(self, external_request_context: dict) -> None:
2400+
"""Close SSE connections during context cleanup.
2401+
2402+
Called by the base gateway after the final event is processed,
2403+
ensuring any pending status updates in the queue are sent before
2404+
the SSE connection is closed.
2405+
"""
2406+
sse_task_id = external_request_context.get("a2a_task_id_for_event")
2407+
if sse_task_id:
24072408
await self.sse_manager.close_all_for_task(sse_task_id)
24082409
log.info(
2409-
"%s Closed SSE connections for SSE Task ID %s after error.",
2410-
log_id_prefix,
2410+
"%s Closed SSE connections for SSE Task ID %s during context cleanup.",
2411+
self.log_identifier,
24112412
sse_task_id,
24122413
)
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
"""
2+
Unit tests for connection cleanup in BaseGatewayComponent._handle_agent_event.
3+
4+
Verifies that _close_external_connections is called in error paths before
5+
remove_context, preventing SSE connection leaks.
6+
"""
7+
8+
import pytest
9+
from unittest.mock import AsyncMock, Mock
10+
11+
from a2a.types import (
12+
JSONRPCResponse,
13+
JSONRPCError,
14+
Task,
15+
TaskState,
16+
TaskStatus,
17+
TaskStatusUpdateEvent,
18+
)
19+
20+
from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
21+
from solace_agent_mesh.gateway.base.task_context import TaskContextManager
22+
23+
24+
TASK_ID = "test-task-123"
25+
RPC_ID = "rpc-001"
26+
27+
28+
def _build_component():
29+
"""Build a mock BaseGatewayComponent with real TaskContextManager."""
30+
component = Mock(spec=BaseGatewayComponent)
31+
component.log_identifier = "[TestGateway]"
32+
component.task_context_manager = TaskContextManager()
33+
component._send_error_to_external = AsyncMock()
34+
component._close_external_connections = AsyncMock()
35+
component._process_parsed_a2a_event = AsyncMock()
36+
component._handle_agent_event = (
37+
BaseGatewayComponent._handle_agent_event.__get__(component)
38+
)
39+
return component
40+
41+
42+
def _store_context(component, task_id=TASK_ID):
43+
"""Store a minimal external request context for the given task."""
44+
ctx = {"sse_task_id": task_id, "connection": "mock-sse-conn"}
45+
component.task_context_manager.store_context(task_id, ctx)
46+
return ctx
47+
48+
49+
def _valid_rpc_response_payload(task_id=TASK_ID, rpc_id=RPC_ID):
50+
"""Build a valid JSONRPCResponse payload with a TaskStatusUpdateEvent result."""
51+
event = TaskStatusUpdateEvent(
52+
task_id=task_id,
53+
context_id=task_id,
54+
final=False,
55+
status=TaskStatus(state=TaskState.working),
56+
)
57+
return JSONRPCResponse(id=rpc_id, result=event).model_dump(mode="json")
58+
59+
60+
def _rpc_response_with_null_result(rpc_id=RPC_ID):
61+
"""Build a JSONRPCResponse where both result and error are None."""
62+
return {"jsonrpc": "2.0", "id": rpc_id, "result": None}
63+
64+
65+
def _rpc_response_with_error(rpc_id=RPC_ID):
66+
"""Build a JSONRPCResponse carrying an error."""
67+
error = JSONRPCError(code=-32000, message="Agent exploded")
68+
return JSONRPCResponse(id=rpc_id, error=error).model_dump(mode="json")
69+
70+
71+
def _rpc_response_with_task_id_mismatch(rpc_id=RPC_ID):
72+
"""Build a JSONRPCResponse whose inner task_id mismatches the topic task_id."""
73+
event = TaskStatusUpdateEvent(
74+
task_id="wrong-task-id",
75+
context_id="wrong-task-id",
76+
final=False,
77+
status=TaskStatus(state=TaskState.working),
78+
)
79+
return JSONRPCResponse(id=rpc_id, result=event).model_dump(mode="json")
80+
81+
82+
class TestHandleAgentEventConnectionCleanup:
83+
"""Verify _close_external_connections is called in all error paths."""
84+
85+
@pytest.mark.asyncio
86+
async def test_null_result_closes_connections_before_removing_context(self):
87+
"""Error path 1: result is None — connections must close before context removal."""
88+
component = _build_component()
89+
_store_context(component)
90+
91+
result = await component._handle_agent_event(
92+
"topic/response", _rpc_response_with_null_result(), TASK_ID
93+
)
94+
95+
assert result is False
96+
component._send_error_to_external.assert_called_once()
97+
component._close_external_connections.assert_called_once()
98+
assert component.task_context_manager.get_context(TASK_ID) is None
99+
100+
@pytest.mark.asyncio
101+
async def test_task_id_mismatch_closes_connections_before_removing_context(self):
102+
"""Error path 1 variant: task_id mismatch nullifies parsed event."""
103+
component = _build_component()
104+
_store_context(component)
105+
106+
result = await component._handle_agent_event(
107+
"topic/response", _rpc_response_with_task_id_mismatch(), TASK_ID
108+
)
109+
110+
assert result is False
111+
component._send_error_to_external.assert_called_once()
112+
component._close_external_connections.assert_called_once()
113+
assert component.task_context_manager.get_context(TASK_ID) is None
114+
115+
@pytest.mark.asyncio
116+
async def test_process_event_exception_closes_connections_before_removing_context(self):
117+
"""Error path 2: _process_parsed_a2a_event raises — connections must close."""
118+
component = _build_component()
119+
_store_context(component)
120+
component._process_parsed_a2a_event.side_effect = RuntimeError("boom")
121+
122+
result = await component._handle_agent_event(
123+
"topic/response", _valid_rpc_response_payload(), TASK_ID
124+
)
125+
126+
assert result is False
127+
component._send_error_to_external.assert_called_once()
128+
component._close_external_connections.assert_called_once()
129+
assert component.task_context_manager.get_context(TASK_ID) is None
130+
131+
@pytest.mark.asyncio
132+
async def test_close_called_after_send_error_and_before_context_gone(self):
133+
"""Verify ordering: send_error → close_connections → context removed."""
134+
component = _build_component()
135+
_store_context(component)
136+
137+
call_order = []
138+
component._send_error_to_external.side_effect = (
139+
lambda *a, **kw: call_order.append("send_error")
140+
)
141+
142+
original_close = component._close_external_connections
143+
144+
async def track_close(*a, **kw):
145+
assert component.task_context_manager.get_context(TASK_ID) is not None, (
146+
"Context was removed before _close_external_connections"
147+
)
148+
call_order.append("close_connections")
149+
return await original_close(*a, **kw)
150+
151+
component._close_external_connections = track_close
152+
153+
await component._handle_agent_event(
154+
"topic/response", _rpc_response_with_null_result(), TASK_ID
155+
)
156+
157+
assert call_order == ["send_error", "close_connections"]
158+
assert component.task_context_manager.get_context(TASK_ID) is None
159+
160+
161+
class TestHandleAgentEventConnectionCleanupOnProcessException:
162+
"""Same ordering guarantees for the _process_parsed_a2a_event exception path."""
163+
164+
@pytest.mark.asyncio
165+
async def test_close_called_after_send_error_and_before_context_gone(self):
166+
"""Verify ordering in exception path: send_error → close → context removed."""
167+
component = _build_component()
168+
_store_context(component)
169+
component._process_parsed_a2a_event.side_effect = ValueError("bad data")
170+
171+
call_order = []
172+
component._send_error_to_external.side_effect = (
173+
lambda *a, **kw: call_order.append("send_error")
174+
)
175+
176+
original_close = component._close_external_connections
177+
178+
async def track_close(*a, **kw):
179+
assert component.task_context_manager.get_context(TASK_ID) is not None, (
180+
"Context was removed before _close_external_connections"
181+
)
182+
call_order.append("close_connections")
183+
return await original_close(*a, **kw)
184+
185+
component._close_external_connections = track_close
186+
187+
await component._handle_agent_event(
188+
"topic/response", _valid_rpc_response_payload(), TASK_ID
189+
)
190+
191+
assert call_order == ["send_error", "close_connections"]
192+
assert component.task_context_manager.get_context(TASK_ID) is None
193+
194+
@pytest.mark.asyncio
195+
async def test_stream_buffer_context_also_removed(self):
196+
"""Both task context and stream buffer context should be cleaned up."""
197+
component = _build_component()
198+
_store_context(component)
199+
component.task_context_manager.store_context(
200+
f"{TASK_ID}_stream_buffer", {"buffer": []}
201+
)
202+
component._process_parsed_a2a_event.side_effect = RuntimeError("boom")
203+
204+
await component._handle_agent_event(
205+
"topic/response", _valid_rpc_response_payload(), TASK_ID
206+
)
207+
208+
assert component.task_context_manager.get_context(TASK_ID) is None
209+
assert (
210+
component.task_context_manager.get_context(f"{TASK_ID}_stream_buffer")
211+
is None
212+
)
213+
214+
215+
class TestHandleAgentEventHappyPath:
216+
"""Verify _close_external_connections is NOT called by _handle_agent_event
217+
in the happy path (it's the responsibility of _process_parsed_a2a_event)."""
218+
219+
@pytest.mark.asyncio
220+
async def test_successful_event_does_not_close_connections(self):
221+
"""Happy path delegates to _process_parsed_a2a_event without closing."""
222+
component = _build_component()
223+
_store_context(component)
224+
225+
result = await component._handle_agent_event(
226+
"topic/response", _valid_rpc_response_payload(), TASK_ID
227+
)
228+
229+
assert result is True
230+
component._close_external_connections.assert_not_called()
231+
component._send_error_to_external.assert_not_called()
232+
233+
234+
class TestHandleAgentEventNoContext:
235+
"""Edge case: no context stored for the task_id."""
236+
237+
@pytest.mark.asyncio
238+
async def test_missing_context_returns_true_without_closing(self):
239+
"""When no context exists, the method returns early without closing."""
240+
component = _build_component()
241+
242+
result = await component._handle_agent_event(
243+
"topic/response", _valid_rpc_response_payload(), TASK_ID
244+
)
245+
246+
assert result is True
247+
component._close_external_connections.assert_not_called()
248+
component._send_error_to_external.assert_not_called()
249+
250+
@pytest.mark.asyncio
251+
async def test_invalid_payload_returns_false_without_closing(self):
252+
"""Completely invalid payload fails before reaching cleanup paths."""
253+
component = _build_component()
254+
255+
result = await component._handle_agent_event(
256+
"topic/response", {"garbage": True}, TASK_ID
257+
)
258+
259+
assert result is False
260+
component._close_external_connections.assert_not_called()
261+
component._send_error_to_external.assert_not_called()
262+
263+
264+
class TestHandleAgentEventWithRPCError:
265+
"""When the RPC response carries an error, it goes through _process_parsed_a2a_event."""
266+
267+
@pytest.mark.asyncio
268+
async def test_rpc_error_delegates_to_process_parsed_event(self):
269+
"""An error in the RPC response is parsed and delegated, not handled locally."""
270+
component = _build_component()
271+
_store_context(component)
272+
273+
result = await component._handle_agent_event(
274+
"topic/response", _rpc_response_with_error(), TASK_ID
275+
)
276+
277+
assert result is True
278+
component._process_parsed_a2a_event.assert_called_once()
279+
component._close_external_connections.assert_not_called()

0 commit comments

Comments
 (0)