Skip to content

Commit 4391438

Browse files
authored
Transfer network bytes to smaller buffer (#63265)
Currently we read in 64KB blocks from the network. When TLS is not enabled, these bytes are normally passed all the way to the application layer (some exceptions: compression). For the HTTP layer this means that these bytes can live throughout the entire lifecycle of an indexing request. The problem is that if the reads from the socket are small, this means that 64KB buffers can be consumed by 1KB or smaller reads. If the socket buffer or TCP buffer sizes are small, the leads to massive memory waste. It has been identified as a major source of OOMs on coordinating nodes as Elasticsearch easily exhausts the heap for these network bytes. This commit resolves the problem by placing a handler after the TLS handler to copy these bytes to a more appropriate buffer size as necessary. This comes after TLS, because TLS is a framing layer which often resolves this problem for us (the 64KB buffer will be decoded into a more appropriate buffer size). However, this extra handler will solve it for the non-TLS pipelines.
1 parent 7f1d0a7 commit 4391438

File tree

3 files changed

+52
-2
lines changed

3 files changed

+52
-2
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@
6161
import org.elasticsearch.http.HttpServerChannel;
6262
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
6363
import org.elasticsearch.threadpool.ThreadPool;
64-
import org.elasticsearch.transport.SharedGroupFactory;
6564
import org.elasticsearch.transport.NettyAllocator;
65+
import org.elasticsearch.transport.NettyByteBufSizer;
66+
import org.elasticsearch.transport.SharedGroupFactory;
6667
import org.elasticsearch.transport.netty4.Netty4Utils;
6768

6869
import java.net.InetSocketAddress;
@@ -284,13 +285,15 @@ public ChannelHandler configureServerChannelHandler() {
284285
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
285286

286287
private final Netty4HttpServerTransport transport;
288+
private final NettyByteBufSizer byteBufSizer;
287289
private final Netty4HttpRequestCreator requestCreator;
288290
private final Netty4HttpRequestHandler requestHandler;
289291
private final HttpHandlingSettings handlingSettings;
290292

291293
protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
292294
this.transport = transport;
293295
this.handlingSettings = handlingSettings;
296+
this.byteBufSizer = new NettyByteBufSizer();
294297
this.requestCreator = new Netty4HttpRequestCreator();
295298
this.requestHandler = new Netty4HttpRequestHandler(transport);
296299
}
@@ -299,6 +302,7 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
299302
protected void initChannel(Channel ch) throws Exception {
300303
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
301304
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
305+
ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
302306
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
303307
final HttpRequestDecoder decoder = new HttpRequestDecoder(
304308
handlingSettings.getMaxInitialLineLength(),
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.channel.ChannelHandler;
24+
import io.netty.channel.ChannelHandlerContext;
25+
import io.netty.handler.codec.MessageToMessageDecoder;
26+
27+
import java.util.List;
28+
29+
@ChannelHandler.Sharable
30+
public class NettyByteBufSizer extends MessageToMessageDecoder<ByteBuf> {
31+
32+
@Override
33+
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
34+
int readableBytes = buf.readableBytes();
35+
if (buf.capacity() >= 1024) {
36+
ByteBuf resized = buf.discardReadBytes().capacity(readableBytes);
37+
assert resized.readableBytes() == readableBytes;
38+
out.add(resized.retain());
39+
} else {
40+
out.add(buf.retain());
41+
}
42+
}
43+
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@
5353
import org.elasticsearch.core.internal.net.NetUtils;
5454
import org.elasticsearch.indices.breaker.CircuitBreakerService;
5555
import org.elasticsearch.threadpool.ThreadPool;
56-
import org.elasticsearch.transport.SharedGroupFactory;
5756
import org.elasticsearch.transport.NettyAllocator;
57+
import org.elasticsearch.transport.NettyByteBufSizer;
58+
import org.elasticsearch.transport.SharedGroupFactory;
5859
import org.elasticsearch.transport.TcpTransport;
5960
import org.elasticsearch.transport.TransportSettings;
6061

@@ -327,6 +328,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
327328
protected class ServerChannelInitializer extends ChannelInitializer<Channel> {
328329

329330
protected final String name;
331+
private final NettyByteBufSizer sizer = new NettyByteBufSizer();
330332

331333
protected ServerChannelInitializer(String name) {
332334
this.name = name;
@@ -337,6 +339,7 @@ protected void initChannel(Channel ch) throws Exception {
337339
addClosedExceptionLogger(ch);
338340
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
339341
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
342+
ch.pipeline().addLast("byte_buf_sizer", sizer);
340343
ch.pipeline().addLast("logging", new ESLoggingHandler());
341344
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
342345
serverAcceptedChannel(nettyTcpChannel);

0 commit comments

Comments
 (0)