Skip to content

Commit 3e1d476

Browse files
committed
Review fixes
1 parent 306bacc commit 3e1d476

File tree

1 file changed

+24
-5
lines changed

1 file changed

+24
-5
lines changed

extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import static org.junit.jupiter.api.Assertions.assertTrue;
99

1010
import java.io.IOException;
11+
import java.util.HashSet;
1112
import java.util.List;
13+
import java.util.Set;
1214
import java.util.concurrent.CountDownLatch;
1315
import java.util.concurrent.TimeUnit;
1416
import java.util.concurrent.atomic.AtomicBoolean;
@@ -237,7 +239,9 @@ public void testKafkaEventReceivedByA2AServer() throws Exception {
237239
}
238240
};
239241

240-
// Create error handler - filter out benign stream closed errors
242+
// Create error handler - filter out benign stream closed errors.
243+
// HTTP/2 streams are cancelled during normal cleanup when subscriptions end,
244+
// which is expected behavior and not an actual error condition.
241245
Consumer<Throwable> errorHandler = error -> {
242246
if (!isStreamClosedError(error)) {
243247
errorRef.set(error);
@@ -429,9 +433,24 @@ public void testPoisonPillGenerationOnTaskFinalization() throws Exception {
429433
/**
430434
* Checks if an error is a benign stream closed/cancelled error that should be ignored.
431435
* HTTP/2 streams can be cancelled during normal cleanup, which is not an actual error.
436+
*
437+
* @param error the throwable to check (may be null)
438+
* @return true if this is a benign stream closure error that should be ignored
432439
*/
433440
private boolean isStreamClosedError(Throwable error) {
434-
if (error == null) {
441+
return isStreamClosedError(error, new HashSet<>());
442+
}
443+
444+
/**
445+
* Internal recursive implementation with cycle detection to prevent infinite recursion.
446+
*
447+
* @param error the throwable to check
448+
* @param visited set of already-visited throwables to detect cycles
449+
* @return true if this is a benign stream closure error
450+
*/
451+
private boolean isStreamClosedError(Throwable error, Set<Throwable> visited) {
452+
if (error == null || !visited.add(error)) {
453+
// Null or already visited (cycle detected)
435454
return false;
436455
}
437456

@@ -449,10 +468,10 @@ private boolean isStreamClosedError(Throwable error) {
449468
}
450469
}
451470

452-
// Check cause recursively
471+
// Check cause recursively with cycle detection
453472
Throwable cause = error.getCause();
454-
if (cause != null && cause != error) {
455-
return isStreamClosedError(cause);
473+
if (cause != null) {
474+
return isStreamClosedError(cause, visited);
456475
}
457476

458477
return false;

0 commit comments

Comments
 (0)