Skip to content

Commit d4d7141

Browse files
ywangdmhl-b
andauthored
fix trappy http stream tests (#116829) (#116905)
(cherry picked from commit 9489726) # Conflicts: # modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java # modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java # muted-tests.yml Co-authored-by: Mikhail Berezovskiy <[email protected]>
1 parent 68337ff commit d4d7141

File tree

4 files changed

+15
-12
lines changed

4 files changed

+15
-12
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public void testClientConnectionCloseMidStream() throws Exception {
173173

174174
// await stream handler is ready and request full content
175175
var handler = ctx.awaitRestChannelAccepted(opaqueId);
176-
assertBusy(() -> assertNotNull(handler.stream.buf()));
176+
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
177177

178178
// enable auto-read to receive channel close event
179179
handler.stream.channel().config().setAutoRead(true);
@@ -182,7 +182,7 @@ public void testClientConnectionCloseMidStream() throws Exception {
182182
// terminate connection and wait resources are released
183183
ctx.clientChannel.close();
184184
assertBusy(() -> {
185-
assertNull(handler.stream.buf());
185+
assertEquals(0, handler.stream.bufSize());
186186
assertTrue(handler.streamClosed);
187187
});
188188
}
@@ -199,13 +199,13 @@ public void testServerCloseConnectionMidStream() throws Exception {
199199

200200
// await stream handler is ready and request full content
201201
var handler = ctx.awaitRestChannelAccepted(opaqueId);
202-
assertBusy(() -> assertNotNull(handler.stream.buf()));
202+
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
203203
assertFalse(handler.streamClosed);
204204

205205
// terminate connection on server and wait resources are released
206206
handler.channel.request().getHttpChannel().close();
207207
assertBusy(() -> {
208-
assertNull(handler.stream.buf());
208+
assertEquals(0, handler.stream.bufSize());
209209
assertTrue(handler.streamClosed);
210210
});
211211
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
3535
private final ChannelFutureListener closeListener = future -> doClose();
3636
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
3737
private ByteBuf buf;
38-
private boolean hasLast = false;
3938
private boolean requested = false;
4039
private boolean closing = false;
4140
private HttpBody.ChunkHandler handler;
4241

42+
// used in tests
43+
private volatile int bufSize = 0;
44+
private volatile boolean hasLast = false;
45+
4346
public Netty4HttpRequestBodyStream(Channel channel) {
4447
this.channel = channel;
4548
Netty4Utils.addListener(channel.closeFuture(), closeListener);
@@ -106,6 +109,7 @@ private void addChunk(ByteBuf chunk) {
106109
comp.addComponent(true, chunk);
107110
buf = comp;
108111
}
112+
bufSize = buf.readableBytes();
109113
}
110114

111115
// visible for test
@@ -114,8 +118,8 @@ Channel channel() {
114118
}
115119

116120
// visible for test
117-
ByteBuf buf() {
118-
return buf;
121+
int bufSize() {
122+
return bufSize;
119123
}
120124

121125
// visible for test
@@ -129,6 +133,7 @@ private void send() {
129133
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
130134
requested = false;
131135
buf = null;
136+
bufSize = 0;
132137
for (var tracer : tracingHandlers) {
133138
tracer.onNext(bytesRef, hasLast);
134139
}
@@ -155,6 +160,7 @@ private void doClose() {
155160
if (buf != null) {
156161
buf.release();
157162
buf = null;
163+
bufSize = 0;
158164
}
159165
channel.config().setAutoRead(true);
160166
}

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void testEnqueueChunksBeforeRequest() {
5858
for (int i = 0; i < totalChunks; i++) {
5959
channel.writeInbound(randomContent(1024));
6060
}
61-
assertEquals(totalChunks * 1024, stream.buf().readableBytes());
61+
assertEquals(totalChunks * 1024, stream.bufSize());
6262
}
6363

6464
// ensures all received chunks can be flushed downstream
@@ -101,7 +101,7 @@ public void testReadFromChannel() {
101101
channel.writeInbound(randomLastContent(chunkSize));
102102

103103
for (int i = 0; i < totalChunks; i++) {
104-
assertNull("should not enqueue chunks", stream.buf());
104+
assertEquals("should not enqueue chunks", 0, stream.bufSize());
105105
stream.next();
106106
channel.runPendingTasks();
107107
assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,6 @@ tests:
286286
- class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT
287287
method: testEnterpriseDownloaderTask
288288
issue: https://github.com/elastic/elasticsearch/issues/115163
289-
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
290-
method: testServerCloseConnectionMidStream
291-
issue: https://github.com/elastic/elasticsearch/issues/116774
292289

293290
# Examples:
294291
#

0 commit comments

Comments
 (0)