Skip to content

Commit 7ab9097

Browse files
authored
Backport explicit http content copy/retain #116115 (#117276)
* backport explicit http content copy/retain #116115 * spotless
1 parent 02e5830 commit 7ab9097

File tree

31 files changed

+1505
-137
lines changed

31 files changed

+1505
-137
lines changed

build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,5 @@ org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(java.lang.Str
171171
@defaultMessage Use a Thread constructor with a name, anonymous threads are more difficult to debug
172172
java.lang.Thread#<init>(java.lang.Runnable)
173173
java.lang.Thread#<init>(java.lang.ThreadGroup, java.lang.Runnable)
174+
175+
org.elasticsearch.common.bytes.BytesReference#copyBytes(org.elasticsearch.common.bytes.BytesReference) @ This method is a subject for removal. Copying bytes is prone to performance regressions and unnecessary allocations.

docs/changelog/116115.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116115
2+
summary: Allow http unsafe buffers by default
3+
area: Network
4+
type: enhancement
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -699,11 +699,6 @@ public Collection<RestHandler> getRestHandlers(
699699
Predicate<NodeFeature> clusterSupportsFeature
700700
) {
701701
return List.of(new BaseRestHandler() {
702-
@Override
703-
public boolean allowsUnsafeBuffers() {
704-
return true;
705-
}
706-
707702
@Override
708703
public String getName() {
709704
return ROUTE;
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.http.netty4;
11+
12+
import io.netty.handler.codec.http.HttpResponseStatus;
13+
14+
import org.apache.lucene.util.BytesRef;
15+
import org.elasticsearch.ESNetty4IntegTestCase;
16+
import org.elasticsearch.client.internal.node.NodeClient;
17+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.node.DiscoveryNodes;
19+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
20+
import org.elasticsearch.common.settings.ClusterSettings;
21+
import org.elasticsearch.common.settings.IndexScopedSettings;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.settings.SettingsFilter;
24+
import org.elasticsearch.common.util.CollectionUtils;
25+
import org.elasticsearch.core.Tuple;
26+
import org.elasticsearch.features.NodeFeature;
27+
import org.elasticsearch.http.HttpServerTransport;
28+
import org.elasticsearch.plugins.ActionPlugin;
29+
import org.elasticsearch.plugins.Plugin;
30+
import org.elasticsearch.rest.BaseRestHandler;
31+
import org.elasticsearch.rest.RestController;
32+
import org.elasticsearch.rest.RestHandler;
33+
import org.elasticsearch.rest.RestRequest;
34+
import org.elasticsearch.rest.RestResponse;
35+
import org.elasticsearch.rest.RestStatus;
36+
37+
import java.io.IOException;
38+
import java.util.Collection;
39+
import java.util.List;
40+
import java.util.function.Predicate;
41+
import java.util.function.Supplier;
42+
43+
public class Netty4TrashingAllocatorIT extends ESNetty4IntegTestCase {
44+
45+
@Override
46+
protected Collection<Class<? extends Plugin>> nodePlugins() {
47+
return CollectionUtils.concatLists(List.of(Handler.class), super.nodePlugins());
48+
}
49+
50+
@Override
51+
protected boolean addMockHttpTransport() {
52+
return false;
53+
}
54+
55+
public void testTrashContent() throws InterruptedException {
56+
try (var client = new Netty4HttpClient()) {
57+
var addr = randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address();
58+
var content = randomAlphaOfLength(between(1024, 2048));
59+
var responses = client.post(addr, List.of(new Tuple<>(Handler.ROUTE, content)));
60+
assertEquals(HttpResponseStatus.OK, responses.stream().findFirst().get().status());
61+
}
62+
}
63+
64+
public static class Handler extends Plugin implements ActionPlugin {
65+
static final String ROUTE = "/_test/trashing-alloc";
66+
67+
@Override
68+
public Collection<RestHandler> getRestHandlers(
69+
Settings settings,
70+
NamedWriteableRegistry namedWriteableRegistry,
71+
RestController restController,
72+
ClusterSettings clusterSettings,
73+
IndexScopedSettings indexScopedSettings,
74+
SettingsFilter settingsFilter,
75+
IndexNameExpressionResolver indexNameExpressionResolver,
76+
Supplier<DiscoveryNodes> nodesInCluster,
77+
Predicate<NodeFeature> clusterSupportsFeature
78+
) {
79+
return List.of(new BaseRestHandler() {
80+
@Override
81+
public String getName() {
82+
return ROUTE;
83+
}
84+
85+
@Override
86+
public List<Route> routes() {
87+
return List.of(new Route(RestRequest.Method.POST, ROUTE));
88+
}
89+
90+
@Override
91+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
92+
var content = request.releasableContent();
93+
var iter = content.iterator();
94+
return (chan) -> {
95+
request.getHttpRequest().release();
96+
assertFalse(content.hasReferences());
97+
BytesRef br;
98+
while ((br = iter.next()) != null) {
99+
for (int i = br.offset; i < br.offset + br.length; i++) {
100+
if (br.bytes[i] != 0) {
101+
fail(
102+
new AssertionError(
103+
"buffer is not trashed, off="
104+
+ br.offset
105+
+ " len="
106+
+ br.length
107+
+ " pos="
108+
+ i
109+
+ " ind="
110+
+ (i - br.offset)
111+
)
112+
);
113+
}
114+
}
115+
}
116+
chan.sendResponse(new RestResponse(RestStatus.OK, ""));
117+
};
118+
}
119+
});
120+
}
121+
}
122+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12-
import io.netty.buffer.ByteBuf;
1312
import io.netty.buffer.Unpooled;
1413
import io.netty.handler.codec.http.DefaultFullHttpRequest;
1514
import io.netty.handler.codec.http.EmptyHttpHeaders;
@@ -128,39 +127,6 @@ public void release() {
128127
}
129128
}
130129

131-
@Override
132-
public HttpRequest releaseAndCopy() {
133-
assert released.get() == false;
134-
if (pooled == false) {
135-
return this;
136-
}
137-
try {
138-
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
139-
HttpBody newContent;
140-
if (content.isStream()) {
141-
newContent = content;
142-
} else {
143-
newContent = Netty4Utils.fullHttpBodyFrom(copiedContent);
144-
}
145-
return new Netty4HttpRequest(
146-
sequence,
147-
new DefaultFullHttpRequest(
148-
request.protocolVersion(),
149-
request.method(),
150-
request.uri(),
151-
copiedContent,
152-
request.headers(),
153-
request.trailingHeaders()
154-
),
155-
new AtomicBoolean(false),
156-
false,
157-
newContent
158-
);
159-
} finally {
160-
release();
161-
}
162-
}
163-
164130
@Override
165131
public final Map<String, List<String>> getHeaders() {
166132
return headers;

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public boolean hasReferences() {
179179
}
180180

181181
public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) {
182-
return new HttpBody.ByteRefHttpBody(toBytesReference(buf));
182+
return new HttpBody.ByteRefHttpBody(toReleasableBytesReference(buf));
183183
}
184184

185185
public static Recycler<BytesRef> createRecycler(Settings settings) {

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import org.elasticsearch.common.recycler.Recycler;
2525
import org.elasticsearch.common.unit.ByteSizeValue;
2626
import org.elasticsearch.common.util.PageCacheRecycler;
27+
import org.elasticsearch.core.Assertions;
2728
import org.elasticsearch.core.Booleans;
2829
import org.elasticsearch.monitor.jvm.JvmInfo;
2930

31+
import java.util.Arrays;
3032
import java.util.concurrent.atomic.AtomicBoolean;
3133

3234
public class NettyAllocator {
@@ -44,8 +46,9 @@ public class NettyAllocator {
4446
private static final String USE_NETTY_DEFAULT_CHUNK = "es.unsafe.use_netty_default_chunk_and_page_size";
4547

4648
static {
49+
ByteBufAllocator allocator;
4750
if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
48-
ALLOCATOR = ByteBufAllocator.DEFAULT;
51+
allocator = ByteBufAllocator.DEFAULT;
4952
SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
5053
DESCRIPTION = "[name=netty_default, suggested_max_allocation_size="
5154
+ ByteSizeValue.ofBytes(SUGGESTED_MAX_ALLOCATION_SIZE)
@@ -127,7 +130,12 @@ public class NettyAllocator {
127130
+ g1gcRegionSize
128131
+ "}]";
129132
}
130-
ALLOCATOR = new NoDirectBuffers(delegate);
133+
allocator = new NoDirectBuffers(delegate);
134+
}
135+
if (Assertions.ENABLED) {
136+
ALLOCATOR = new TrashingByteBufAllocator(allocator);
137+
} else {
138+
ALLOCATOR = allocator;
131139
}
132140

133141
RECYCLER = new Recycler<>() {
@@ -353,4 +361,105 @@ public ByteBufAllocator getDelegate() {
353361
return delegate;
354362
}
355363
}
364+
365+
static class TrashingByteBuf extends WrappedByteBuf {
366+
367+
private boolean trashed = false;
368+
369+
protected TrashingByteBuf(ByteBuf buf) {
370+
super(buf);
371+
}
372+
373+
@Override
374+
public boolean release() {
375+
if (refCnt() == 1) {
376+
// see [NOTE on racy trashContent() calls]
377+
trashContent();
378+
}
379+
return super.release();
380+
}
381+
382+
@Override
383+
public boolean release(int decrement) {
384+
if (refCnt() == decrement && refCnt() > 0) {
385+
// see [NOTE on racy trashContent() calls]
386+
trashContent();
387+
}
388+
return super.release(decrement);
389+
}
390+
391+
// [NOTE on racy trashContent() calls]: We trash the buffer content _before_ reducing the ref
392+
// count to zero, which looks racy because in principle a concurrent caller could come along
393+
// and successfully retain() this buffer to keep it alive after it's been trashed. Such a
394+
// caller would sometimes get an IllegalReferenceCountException ofc but that's something it
395+
// could handle - see for instance org.elasticsearch.transport.netty4.Netty4Utils.ByteBufRefCounted.tryIncRef.
396+
// Yet in practice this should never happen, we only ever retain() these buffers while we
397+
// know them to be alive (i.e. via RefCounted#mustIncRef or its moral equivalents) so it'd
398+
// be a bug for a caller to retain() a buffer whose ref count is heading to zero and whose
399+
// contents we've already decided to trash.
400+
private void trashContent() {
401+
if (trashed == false) {
402+
trashed = true;
403+
TrashingByteBufAllocator.trashBuffer(buf);
404+
}
405+
}
406+
}
407+
408+
static class TrashingCompositeByteBuf extends CompositeByteBuf {
409+
410+
TrashingCompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
411+
super(alloc, direct, maxNumComponents);
412+
}
413+
414+
@Override
415+
protected void deallocate() {
416+
TrashingByteBufAllocator.trashBuffer(this);
417+
super.deallocate();
418+
}
419+
}
420+
421+
static class TrashingByteBufAllocator extends NoDirectBuffers {
422+
423+
static int DEFAULT_MAX_COMPONENTS = 16;
424+
425+
static void trashBuffer(ByteBuf buf) {
426+
for (var nioBuf : buf.nioBuffers()) {
427+
if (nioBuf.hasArray()) {
428+
var from = nioBuf.arrayOffset() + nioBuf.position();
429+
var to = from + nioBuf.remaining();
430+
Arrays.fill(nioBuf.array(), from, to, (byte) 0);
431+
}
432+
}
433+
}
434+
435+
TrashingByteBufAllocator(ByteBufAllocator delegate) {
436+
super(delegate);
437+
}
438+
439+
@Override
440+
public ByteBuf heapBuffer() {
441+
return new TrashingByteBuf(super.heapBuffer());
442+
}
443+
444+
@Override
445+
public ByteBuf heapBuffer(int initialCapacity) {
446+
return new TrashingByteBuf(super.heapBuffer(initialCapacity));
447+
}
448+
449+
@Override
450+
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
451+
return new TrashingByteBuf(super.heapBuffer(initialCapacity, maxCapacity));
452+
}
453+
454+
@Override
455+
public CompositeByteBuf compositeHeapBuffer() {
456+
return new TrashingCompositeByteBuf(this, false, DEFAULT_MAX_COMPONENTS);
457+
}
458+
459+
@Override
460+
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
461+
return new TrashingCompositeByteBuf(this, false, maxNumComponents);
462+
}
463+
464+
}
356465
}

0 commit comments

Comments
 (0)