@@ -154,70 +154,51 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
154154 }
155155
156156 // Determine interrupt behavior
157- boolean shouldInterrupt = false ;
158- boolean continueInBackground = false ;
159157 boolean isFinalEvent = (event instanceof Task task && task .getStatus ().state ().isFinal ())
160158 || (event instanceof TaskStatusUpdateEvent tsue && tsue .isFinal ());
161159 boolean isAuthRequired = (event instanceof Task task && task .getStatus ().state () == TaskState .AUTH_REQUIRED )
162160 || (event instanceof TaskStatusUpdateEvent tsue && tsue .getStatus ().state () == TaskState .AUTH_REQUIRED );
163161
164- // Always interrupt on auth_required, as it needs external action.
165- if (isAuthRequired ) {
166162 // auth-required is a special state: the message should be
167163 // escalated back to the caller, but the agent is expected to
168164 // continue producing events once the authorization is received
169165 // out-of-band. This is in contrast to input-required, where a
170166 // new request is expected in order for the agent to make progress,
171167 // so the agent should exit.
172- shouldInterrupt = true ;
173- continueInBackground = true ;
174- }
175- else if (!blocking ) {
176- // For non-blocking calls, interrupt as soon as a task is available.
177- shouldInterrupt = true ;
178- continueInBackground = true ;
179- }
180- else if (blocking ) {
168+ if (!blocking ) {
181169 // For blocking calls: Interrupt to free Vert.x thread, but continue in background
182170 // Python's async consumption doesn't block threads, but Java's does
183171 // So we interrupt to return quickly, then rely on background consumption
184172 // DefaultRequestHandler will fetch the final state from TaskStore
185- shouldInterrupt = true ;
186- continueInBackground = true ;
187173 if (LOGGER .isDebugEnabled ()) {
188174 LOGGER .debug ("Blocking call for task {}: {} event, returning with background consumption" ,
189175 taskIdForLogging (), isFinalEvent ? "final" : "non-final" );
190176 }
191177 }
192178
193- if (shouldInterrupt ) {
194- // Complete the future to unblock the main thread
195- interrupted .set (true );
196- completionFuture .complete (null );
197-
198- // For blocking calls, DON'T complete consumptionCompletionFuture here.
199- // Let it complete naturally when subscription finishes (onComplete callback below).
200- // This ensures all events are processed and persisted to TaskStore before
201- // DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
202- //
203- // For non-blocking and auth-required calls, complete immediately to allow
204- // cleanup to proceed while consumption continues in background.
205- if (!blocking ) {
206- consumptionCompletionFuture .complete (null );
207- }
208- // else: blocking calls wait for actual consumption completion in onComplete
179+ // Complete the future to unblock the main thread
180+ interrupted .set (true );
181+ completionFuture .complete (null );
209182
210- // Continue consuming in background - keep requesting events
211- // Note: continueInBackground is always true when shouldInterrupt is true
212- // (auth-required, non-blocking, or blocking all set it to true)
213- if (LOGGER .isDebugEnabled ()) {
214- String reason = isAuthRequired ? "auth-required" : (blocking ? "blocking" : "non-blocking" );
215- LOGGER .debug ("Task {}: Continuing background consumption (reason: {})" , taskIdForLogging (), reason );
216- }
217- return true ;
183+ // For blocking calls, DON'T complete consumptionCompletionFuture here.
184+ // Let it complete naturally when subscription finishes (onComplete callback below).
185+ // This ensures all events are processed and persisted to TaskStore before
186+ // DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
187+ //
188+ // For non-blocking and auth-required calls, complete immediately to allow
189+ // cleanup to proceed while consumption continues in background.
190+ if (!blocking ) {
191+ consumptionCompletionFuture .complete (null );
218192 }
193+ // else: blocking calls wait for actual consumption completion in onComplete
219194
220- // Continue processing
195+ // Continue consuming in background - keep requesting events
196+ // Note: continueInBackground is always true when shouldInterrupt is true
197+ // (auth-required, non-blocking, or blocking all set it to true)
198+ if (LOGGER .isDebugEnabled ()) {
199+ String reason = isAuthRequired ? "auth-required" : (blocking ? "blocking" : "non-blocking" );
200+ LOGGER .debug ("Task {}: Continuing background consumption (reason: {})" , taskIdForLogging (), reason );
201+ }
221202 return true ;
222203 },
223204 throwable -> {
0 commit comments