-
Notifications
You must be signed in to change notification settings - Fork 192
Description
Problem
When a subscriber of Multi<ChatEvent> cancels or errors mid-stream, the current HTTP SSE connection is closed via StreamingHandle.cancel(), but the agent tool execution loop inside QuarkusAiServiceStreamingResponseHandler.onCompleteResponse() continues running unchecked.
After cancellation, the handler keeps:
- Executing tools
- Saving messages to chat memory
- Making new LLM streaming requests
All of this runs silently because the dead UnicastProcessor drops events. The loop only stops when the LLM decides to stop calling tools.
Additionally, the existing stream.getStreamingHandle().cancel() in TokenStreamMulti only references the first round's StreamingHandle. New handlers created for subsequent tool rounds are local variables in onCompleteResponse() and are unreachable from the cancellation callback.
Proposed Solution
Introduce a shared AtomicBoolean cancelled flag:
- Created in TokenStreamMulti.subscribe() and set to true in the onCancellation callback
Multi<ChatEvent> cancellableMulti = processor.onCancellation().invoke(() -> {
cancelled.set(true);
});- Passed through QuarkusAiServiceTokenStream to QuarkusAiServiceStreamingResponseHandler
- The handler checks the flag before executing tools and before making the next chat() call
- The handler also checks in onPartialResponse() and self-cancels its own StreamingHandle, working uniformly across all rounds
- The same AtomicBoolean reference is passed to new handlers created for subsequent rounds
Happy to work on the solution for this issue.