Skip to content

Commit cb21030

Browse files
committed
Merge remote-tracking branch 'upstream/main' into add_source_context
2 parents 418638f + 937f80c commit cb21030

File tree

19 files changed

+446
-26
lines changed

19 files changed

+446
-26
lines changed

docs/changelog/131940.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131940
2+
summary: Allow remote enrich after LOOKUP JOIN
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/changelog/133775.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133775
2+
summary: Remove Transfer-Encoding from HTTP request with no content
3+
area: Network
4+
type: bug
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.elasticsearch.transport.netty4.Netty4Utils;
8686
import org.elasticsearch.xcontent.json.JsonXContent;
8787

88+
import java.io.InputStream;
8889
import java.nio.channels.ClosedChannelException;
8990
import java.nio.charset.StandardCharsets;
9091
import java.util.Collection;
@@ -392,6 +393,23 @@ public void testOversizedChunkedEncoding() throws Exception {
392393
}
393394
}
394395

396+
public void testEmptyChunkedEncoding() throws Exception {
397+
try (var clientContext = newClientContext()) {
398+
var opaqueId = clientContext.newOpaqueId();
399+
final var emptyStream = new HttpChunkedInput(new ChunkedStream(InputStream.nullInputStream()));
400+
final var request = httpRequest(opaqueId, 0);
401+
HttpUtil.setTransferEncodingChunked(request, true);
402+
clientContext.channel().pipeline().addLast(new ChunkedWriteHandler());
403+
clientContext.channel().writeAndFlush(request);
404+
clientContext.channel().writeAndFlush(emptyStream);
405+
406+
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
407+
var restRequest = handler.restRequest;
408+
assertFalse(restRequest.hasContent());
409+
assertNull(restRequest.header("Transfer-Encoding"));
410+
}
411+
}
412+
395413
// ensures that we don't leak buffers in stream on 400-bad-request
396414
// some bad requests are dispatched from rest-controller before reaching rest handler
397415
// test relies on netty's buffer leak detection
@@ -733,15 +751,17 @@ Channel channel() {
733751
static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
734752
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
735753
final String opaqueId;
754+
final RestRequest restRequest;
736755
private final AtomicReference<ActionListener<Chunk>> nextChunkListenerRef = new AtomicReference<>();
737756
final Netty4HttpRequestBodyStream stream;
738757
RestChannel channel;
739758
boolean receivedLastChunk = false;
740759
final CountDownLatch closedLatch = new CountDownLatch(1);
741760
volatile boolean shouldThrowInsideHandleChunk = false;
742761

743-
ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
762+
ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) {
744763
this.opaqueId = opaqueId;
764+
this.restRequest = restRequest;
745765
this.stream = stream;
746766
}
747767

@@ -934,7 +954,7 @@ public List<Route> routes() {
934954
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
935955
var stream = (Netty4HttpRequestBodyStream) request.contentStream();
936956
var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0);
937-
var handler = new ServerRequestHandler(opaqueId, stream);
957+
var handler = new ServerRequestHandler(opaqueId, request, stream);
938958
handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler);
939959
return handler;
940960
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.channel.ChannelHandlerContext;
13+
import io.netty.channel.ChannelInboundHandlerAdapter;
14+
import io.netty.handler.codec.http.HttpContent;
15+
import io.netty.handler.codec.http.HttpRequest;
16+
import io.netty.handler.codec.http.HttpUtil;
17+
import io.netty.handler.codec.http.LastHttpContent;
18+
19+
public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter {
20+
21+
private HttpRequest currentRequest;
22+
23+
@Override
24+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
25+
switch (msg) {
26+
case HttpRequest request -> {
27+
if (request.decoderResult().isSuccess() && HttpUtil.isTransferEncodingChunked(request)) {
28+
currentRequest = request;
29+
ctx.read();
30+
} else {
31+
currentRequest = null;
32+
ctx.fireChannelRead(request);
33+
}
34+
}
35+
case HttpContent content -> {
36+
if (currentRequest != null) {
37+
if (content instanceof LastHttpContent && content.content().readableBytes() == 0) {
38+
HttpUtil.setTransferEncodingChunked(currentRequest, false);
39+
}
40+
ctx.fireChannelRead(currentRequest);
41+
ctx.fireChannelRead(content);
42+
currentRequest = null;
43+
} else {
44+
ctx.fireChannelRead(content);
45+
}
46+
}
47+
default -> ctx.fireChannelRead(msg);
48+
}
49+
}
50+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t
414414
if (ResourceLeakDetector.isEnabled()) {
415415
ch.pipeline().addLast(new Netty4LeakDetectionHandler());
416416
}
417+
ch.pipeline().addLast(new Netty4EmptyChunkHandler());
417418
// See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above
418419
// can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is
419420
// resolved we must add another flow controller here:
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.buffer.Unpooled;
13+
import io.netty.channel.embedded.EmbeddedChannel;
14+
import io.netty.handler.codec.DecoderResult;
15+
import io.netty.handler.codec.http.DefaultHttpRequest;
16+
import io.netty.handler.codec.http.DefaultLastHttpContent;
17+
import io.netty.handler.codec.http.HttpMessage;
18+
import io.netty.handler.codec.http.HttpMethod;
19+
import io.netty.handler.codec.http.HttpRequest;
20+
import io.netty.handler.codec.http.HttpUtil;
21+
import io.netty.handler.codec.http.HttpVersion;
22+
23+
import org.elasticsearch.test.ESTestCase;
24+
25+
public class Netty4EmptyChunkHandlerTests extends ESTestCase {
26+
27+
private EmbeddedChannel channel;
28+
29+
@Override
30+
public void setUp() throws Exception {
31+
super.setUp();
32+
channel = new EmbeddedChannel(new Netty4EmptyChunkHandler());
33+
channel.config().setAutoRead(false);
34+
}
35+
36+
public void testNonChunkedPassthrough() {
37+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
38+
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
39+
channel.writeInbound(req, content);
40+
assertEquals(req, channel.readInbound());
41+
assertEquals(content, channel.readInbound());
42+
}
43+
44+
public void testDecodingFailurePassthrough() {
45+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
46+
HttpUtil.setTransferEncodingChunked(req, true);
47+
req.setDecoderResult(DecoderResult.failure(new Exception()));
48+
channel.writeInbound(req);
49+
var recvReq = (HttpRequest) channel.readInbound();
50+
assertTrue(recvReq.decoderResult().isFailure());
51+
assertTrue(HttpUtil.isTransferEncodingChunked(recvReq));
52+
}
53+
54+
public void testHoldChunkedRequest() {
55+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
56+
HttpUtil.setTransferEncodingChunked(req, true);
57+
var readSniffer = new ReadSniffer();
58+
channel.pipeline().addFirst(readSniffer);
59+
channel.writeInbound(req);
60+
assertNull("should hold on HTTP request until first chunk arrives", channel.readInbound());
61+
assertEquals("must read first chunk when holding request", 1, readSniffer.readCount);
62+
}
63+
64+
public void testRemoveEncodingFromEmpty() {
65+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
66+
HttpUtil.setTransferEncodingChunked(req, true);
67+
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
68+
channel.writeInbound(req, content);
69+
var recvReq = channel.readInbound();
70+
assertEquals(req, recvReq);
71+
assertEquals(content, channel.readInbound());
72+
assertFalse("should remove Transfer-Encoding from empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
73+
}
74+
75+
public void testKeepEncodingForNonEmpty() {
76+
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
77+
HttpUtil.setTransferEncodingChunked(req, true);
78+
var content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(between(1, 1024))));
79+
channel.writeInbound(req, content);
80+
var recvReq = channel.readInbound();
81+
assertEquals(req, recvReq);
82+
assertEquals(content, channel.readInbound());
83+
assertTrue("should keep Transfer-Encoding for non-empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
84+
}
85+
86+
public void testRandomizedChannelReuse() {
87+
for (int i = 0; i < 1000; i++) {
88+
switch (between(0, 3)) {
89+
case 0 -> testNonChunkedPassthrough();
90+
case 1 -> testKeepEncodingForNonEmpty();
91+
case 2 -> testDecodingFailurePassthrough();
92+
default -> testRemoveEncodingFromEmpty();
93+
}
94+
}
95+
}
96+
}

muted-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,6 @@ tests:
519519
- class: org.elasticsearch.xpack.esql.action.RandomizedTimeSeriesIT
520520
method: testGroupBySubset
521521
issue: https://github.com/elastic/elasticsearch/issues/133220
522-
- class: org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDeciderIT
523-
method: testHighNodeWriteLoadPreventsNewShardAllocation
524-
issue: https://github.com/elastic/elasticsearch/issues/133857
525522
- class: org.elasticsearch.xpack.kql.parser.KqlParserBooleanQueryTests
526523
method: testParseOrQuery
527524
issue: https://github.com/elastic/elasticsearch/issues/133863
@@ -543,6 +540,9 @@ tests:
543540
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
544541
method: testWithCustomFeatureProcessors
545542
issue: https://github.com/elastic/elasticsearch/issues/134001
543+
- class: org.elasticsearch.xpack.esql.action.RandomizedTimeSeriesIT
544+
method: testRateGroupBySubset
545+
issue: https://github.com/elastic/elasticsearch/issues/134019
546546

547547
# Examples:
548548
#

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
5252
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
53+
import static org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator.calculateUtilizationForWriteLoad;
5354

5455
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
5556
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
@@ -68,6 +69,8 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
6869
*/
6970
public void testHighNodeWriteLoadPreventsNewShardAllocation() {
7071
int randomUtilizationThresholdPercent = randomIntBetween(50, 100);
72+
int numberOfWritePoolThreads = randomIntBetween(2, 20);
73+
float shardWriteLoad = randomFloatBetween(0.0f, 0.01f, false);
7174
Settings settings = Settings.builder()
7275
.put(
7376
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
@@ -115,7 +118,14 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
115118
);
116119

117120
String indexName = randomIdentifier();
118-
int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental.
121+
int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental.
122+
123+
// Calculate the maximum utilization a node can report while still being able to accept all relocating shards
124+
double additionalLoadFromAllShards = calculateUtilizationForWriteLoad(
125+
shardWriteLoad * randomNumberOfShards,
126+
numberOfWritePoolThreads
127+
);
128+
int maxUtilizationPercent = randomUtilizationThresholdPercent - (int) (additionalLoadFromAllShards * 100) - 1;
119129

120130
var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
121131
var indexRoutingTable = clusterState.routingTable().index(indexName);
@@ -154,20 +164,20 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
154164
final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName);
155165
final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
156166
firstDiscoveryNode,
157-
2,
158-
0.5f,
167+
numberOfWritePoolThreads,
168+
randomIntBetween(0, maxUtilizationPercent) / 100f,
159169
0
160170
);
161171
final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
162172
secondDiscoveryNode,
163-
2,
164-
0.5f,
173+
numberOfWritePoolThreads,
174+
randomIntBetween(0, maxUtilizationPercent) / 100f,
165175
0
166176
);
167177
final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
168178
thirdDiscoveryNode,
169-
2,
170-
randomUtilizationThresholdPercent + 1 / 100,
179+
numberOfWritePoolThreads,
180+
(randomUtilizationThresholdPercent + 1) / 100f,
171181
0
172182
);
173183

@@ -197,12 +207,11 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
197207
.getMetadata()
198208
.getProject()
199209
.index(indexName);
200-
double shardWriteLoadDefault = 0.2;
201210
MockTransportService.getInstance(firstDataNodeName)
202211
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
203212
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
204213
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
205-
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId));
214+
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoad, firstDataNodeId));
206215
}
207216
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName);
208217
channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()));

server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,21 @@ public static float updateNodeUtilizationWithShardMovements(
124124
float shardWriteLoadDelta,
125125
int numberOfWriteThreads
126126
) {
127-
float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads);
127+
float newNodeUtilization = nodeUtilization + calculateUtilizationForWriteLoad(shardWriteLoadDelta, numberOfWriteThreads);
128128
return (float) Math.max(newNodeUtilization, 0.0);
129129
}
130130

131+
/**
132+
* Calculate what percentage utilization increase would result from adding some amount of write-load
133+
*
134+
* @param totalShardWriteLoad The write-load being added/removed
135+
* @param numberOfThreads The number of threads in the node-being-added-to's write thread pool
136+
* @return The change in percentage utilization
137+
*/
138+
public static float calculateUtilizationForWriteLoad(float totalShardWriteLoad, int numberOfThreads) {
139+
return totalShardWriteLoad / numberOfThreads;
140+
}
141+
131142
/**
132143
* Adjust the max thread pool queue latency by accounting for whether shard has moved away from the node.
133144
* @param maxThreadPoolQueueLatencyMillis The current max thread pool queue latency.

0 commit comments

Comments
 (0)