Skip to content

Commit 713120f

Browse files
authored
fix: Filter benign stream errors in KafkaReplicationIntegrationTest a… (#398)
…nd improve CI diagnostics - Filter HTTP/2 stream cancellation errors in KafkaReplicationIntegrationTest (same fix as PR #380) - Add surefire reports and build logs upload to build-and-test workflow - Enhance TCK workflow to capture test output, server logs, and compliance reports
1 parent 85dd747 commit 713120f

File tree

3 files changed

+118
-58
lines changed

3 files changed

+118
-58
lines changed

.github/workflows/build-and-test.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,23 @@ jobs:
2727
cache: maven
2828
- name: Build with Maven and run tests
2929
run: mvn -B package --file pom.xml -fae
30+
- name: Upload Test Reports
31+
if: failure()
32+
uses: actions/upload-artifact@v4
33+
with:
34+
name: surefire-reports-java-${{ matrix.java-version }}
35+
path: |
36+
**/target/surefire-reports/
37+
**/target/failsafe-reports/
38+
retention-days: 7
39+
if-no-files-found: warn
40+
- name: Upload Build Logs
41+
if: failure()
42+
uses: actions/upload-artifact@v4
43+
with:
44+
name: build-logs-java-${{ matrix.java-version }}
45+
path: |
46+
**/target/*.log
47+
**/target/quarkus.log
48+
retention-days: 3
49+
if-no-files-found: ignore

.github/workflows/run-tck.yml

Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -100,85 +100,71 @@ jobs:
100100
id: run-tck
101101
timeout-minutes: 5
102102
run: |
103-
./run_tck.py --sut-url ${{ env.SUT_JSONRPC_URL }} --category all --transports jsonrpc,grpc,rest --compliance-report report.json
103+
./run_tck.py --sut-url ${{ env.SUT_JSONRPC_URL }} --category all --transports jsonrpc,grpc,rest --compliance-report report.json 2>&1 | tee tck-output.log
104104
working-directory: tck/a2a-tck
105-
- name: Capture Thread Dump
105+
- name: Capture Diagnostics on Failure
106106
if: failure()
107107
run: |
108+
echo "=== Capturing diagnostic information ==="
109+
110+
# Create diagnostics directory
111+
mkdir -p tck/target/diagnostics
112+
113+
# Capture process list
114+
echo "📋 Capturing process list..."
115+
ps auxww > tck/target/diagnostics/processes.txt
116+
108117
# Find the actual Quarkus JVM (child of Maven process), not the Maven parent
109118
# Look for the dev.jar process which is the actual application
110119
QUARKUS_PID=$(pgrep -f "a2a-tck-server-dev.jar" || echo "")
111120
if [ -n "$QUARKUS_PID" ]; then
112121
echo "📊 Capturing thread dump for Quarkus JVM PID $QUARKUS_PID"
113-
jstack $QUARKUS_PID > tck/target/thread-dump.txt || echo "Failed to capture thread dump"
114-
if [ -f tck/target/thread-dump.txt ]; then
115-
echo "✅ Thread dump captured ($(wc -l < tck/target/thread-dump.txt) lines)"
122+
jstack $QUARKUS_PID > tck/target/diagnostics/thread-dump.txt || echo "Failed to capture thread dump"
123+
if [ -f tck/target/diagnostics/thread-dump.txt ]; then
124+
echo "✅ Thread dump captured ($(wc -l < tck/target/diagnostics/thread-dump.txt) lines)"
116125
fi
117126
else
118127
echo "⚠️ No Quarkus JVM process found for thread dump"
119128
echo "Available Java processes:"
120-
ps aux | grep java || true
129+
ps aux | grep java | tee -a tck/target/diagnostics/processes.txt || true
121130
fi
122-
- name: Capture Heap Dump
123-
if: failure()
124-
run: |
125-
# Find the actual Quarkus JVM (child of Maven process), not the Maven parent
126-
QUARKUS_PID=$(pgrep -f "a2a-tck-server-dev.jar" || echo "")
127-
if [ -n "$QUARKUS_PID" ]; then
128-
echo "📊 Capturing heap dump for Quarkus JVM PID $QUARKUS_PID"
129-
jmap -dump:live,format=b,file=tck/target/heap-dump.hprof $QUARKUS_PID || echo "Failed to capture heap dump"
130-
if [ -f tck/target/heap-dump.hprof ]; then
131-
SIZE=$(du -h tck/target/heap-dump.hprof | cut -f1)
132-
echo "✅ Heap dump captured ($SIZE)"
133-
# Compress to reduce artifact size
134-
gzip tck/target/heap-dump.hprof
135-
COMPRESSED_SIZE=$(du -h tck/target/heap-dump.hprof.gz | cut -f1)
136-
echo "✅ Compressed heap dump ($COMPRESSED_SIZE)"
137-
fi
138-
else
139-
echo "⚠️ No Quarkus JVM process found for heap dump"
140-
echo "Available Java processes:"
141-
ps aux | grep java || true
131+
132+
# Capture Quarkus application logs (if available)
133+
echo "📝 Checking for Quarkus logs..."
134+
if [ -f tck/target/quarkus.log ]; then
135+
cp tck/target/quarkus.log tck/target/diagnostics/
136+
echo "✅ Copied quarkus.log ($(wc -l < tck/target/quarkus.log) lines)"
137+
fi
138+
139+
# Copy TCK server logs
140+
if [ -f tck/target/tck-test.log ]; then
141+
cp tck/target/tck-test.log tck/target/diagnostics/
142+
echo "✅ Copied tck-test.log ($(wc -l < tck/target/tck-test.log) lines)"
142143
fi
144+
145+
echo ""
146+
echo "=== Diagnostic capture complete ==="
143147
- name: Stop Quarkus Server
144148
if: always()
145149
run: |
146150
# Find and kill the Quarkus process to ensure logs are flushed
147151
pkill -f "quarkus:dev" || true
148152
sleep 2
149-
- name: Verify TCK Log
150-
if: failure()
151-
run: |
152-
echo "Checking for log file..."
153-
if [ -f tck/target/tck-test.log ]; then
154-
echo "✅ Log file exists ($(wc -l < tck/target/tck-test.log) lines)"
155-
ls -lh tck/target/tck-test.log
156-
else
157-
echo "❌ Log file not found at tck/target/tck-test.log"
158-
echo "Contents of tck/target/:"
159-
ls -la tck/target/ || echo "tck/target/ does not exist"
160-
fi
161-
- name: Upload TCK Log
153+
- name: Upload TCK Diagnostics
162154
if: failure()
163155
uses: actions/upload-artifact@v4
164156
with:
165-
name: tck-test-log-java-${{ matrix.java-version }}
166-
path: tck/target/tck-test.log
167-
retention-days: 2
157+
name: tck-diagnostics-java-${{ matrix.java-version }}
158+
path: |
159+
tck/target/diagnostics/
160+
tck/a2a-tck/tck-output.log
161+
retention-days: 7
168162
if-no-files-found: warn
169-
- name: Upload Thread Dump
170-
if: failure()
171-
uses: actions/upload-artifact@v4
172-
with:
173-
name: thread-dump-java-${{ matrix.java-version }}
174-
path: tck/target/thread-dump.txt
175-
retention-days: 2
176-
if-no-files-found: warn
177-
- name: Upload Heap Dump
178-
if: failure()
163+
- name: Upload TCK Compliance Report
164+
if: always()
179165
uses: actions/upload-artifact@v4
180166
with:
181-
name: heap-dump-java-${{ matrix.java-version }}
182-
path: tck/target/heap-dump.hprof.gz
183-
retention-days: 2
184-
if-no-files-found: warn
167+
name: tck-compliance-report-java-${{ matrix.java-version }}
168+
path: tck/a2a-tck/report.json
169+
retention-days: 14
170+
if-no-files-found: ignore

extras/queue-manager-replicated/tests-single-instance/src/test/java/io/a2a/extras/queuemanager/replicated/tests/KafkaReplicationIntegrationTest.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import static org.junit.jupiter.api.Assertions.assertNull;
88
import static org.junit.jupiter.api.Assertions.assertTrue;
99

10+
import java.io.IOException;
11+
import java.util.HashSet;
1012
import java.util.List;
13+
import java.util.Set;
1114
import java.util.concurrent.CountDownLatch;
1215
import java.util.concurrent.TimeUnit;
1316
import java.util.concurrent.atomic.AtomicBoolean;
@@ -236,9 +239,13 @@ public void testKafkaEventReceivedByA2AServer() throws Exception {
236239
}
237240
};
238241

239-
// Create error handler
242+
// Create error handler - filter out benign stream closed errors.
243+
// HTTP/2 streams are cancelled during normal cleanup when subscriptions end,
244+
// which is expected behavior and not an actual error condition.
240245
Consumer<Throwable> errorHandler = error -> {
241-
errorRef.set(error);
246+
if (!isStreamClosedError(error)) {
247+
errorRef.set(error);
248+
}
242249
resubscribeLatch.countDown();
243250
};
244251

@@ -423,4 +430,51 @@ public void testPoisonPillGenerationOnTaskFinalization() throws Exception {
423430
assertEquals(taskId, closedEvent.getTaskId(), "QueueClosedEvent task ID should match");
424431
}
425432

433+
/**
434+
* Checks if an error is a benign stream closed/cancelled error that should be ignored.
435+
* HTTP/2 streams can be cancelled during normal cleanup, which is not an actual error.
436+
*
437+
* @param error the throwable to check (may be null)
438+
* @return true if this is a benign stream closure error that should be ignored
439+
*/
440+
private boolean isStreamClosedError(Throwable error) {
441+
return isStreamClosedError(error, new HashSet<>());
442+
}
443+
444+
/**
445+
* Internal recursive implementation with cycle detection to prevent infinite recursion.
446+
*
447+
* @param error the throwable to check
448+
* @param visited set of already-visited throwables to detect cycles
449+
* @return true if this is a benign stream closure error
450+
*/
451+
private boolean isStreamClosedError(Throwable error, Set<Throwable> visited) {
452+
if (error == null || !visited.add(error)) {
453+
// Null or already visited (cycle detected)
454+
return false;
455+
}
456+
457+
// Check for IOException which includes stream cancellation
458+
if (error instanceof IOException) {
459+
String message = error.getMessage();
460+
if (message != null) {
461+
// Filter out normal stream closure/cancellation errors
462+
if (message.contains("Stream closed") ||
463+
message.contains("Stream") && message.contains("cancelled") ||
464+
message.contains("EOF reached") ||
465+
message.contains("CANCEL")) {
466+
return true;
467+
}
468+
}
469+
}
470+
471+
// Check cause recursively with cycle detection
472+
Throwable cause = error.getCause();
473+
if (cause != null) {
474+
return isStreamClosedError(cause, visited);
475+
}
476+
477+
return false;
478+
}
479+
426480
}

0 commit comments

Comments
 (0)