Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
## JVM temporary directory
-Djava.io.tmpdir=${ES_TMPDIR}

-Dio.netty.leakDetection.level=disabled

# Leverages accelerated vector hardware instructions; removing this may
# result in less optimal vector performance
20-:--add-modules=jdk.incubator.vector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.ThreadWatchdog;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.recycler.VariableRecycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
Expand Down Expand Up @@ -113,7 +112,7 @@ public Netty4Transport(
}

@Override
protected Recycler<BytesRef> createRecycler(Settings settings, PageCacheRecycler pageCacheRecycler) {
protected VariableRecycler createRecycler(Settings settings, PageCacheRecycler pageCacheRecycler) {
return Netty4Utils.createRecycler(settings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.recycler.VariableRecycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Booleans;
Expand Down Expand Up @@ -182,7 +182,7 @@ public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) {
return new HttpBody.ByteRefHttpBody(toReleasableBytesReference(buf));
}

public static Recycler<BytesRef> createRecycler(Settings settings) {
public static VariableRecycler createRecycler(Settings settings) {
// If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals
// setting the processors. We must do it ourselves first just in case.
setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import io.netty.channel.Channel;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.recycler.VariableRecycler;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.Assertions;
Expand All @@ -37,9 +39,10 @@ public class NettyAllocator {
private static final Logger logger = LogManager.getLogger(NettyAllocator.class);
private static final AtomicBoolean descriptionLogged = new AtomicBoolean(false);

private static final long SUGGESTED_MAX_ALLOCATION_SIZE;
private static final int SUGGESTED_MAX_ALLOCATION_SIZE;
private static final boolean POOLED;
private static final ByteBufAllocator ALLOCATOR;
private static final Recycler<BytesRef> RECYCLER;
private static final VariableRecycler RECYCLER;
private static final String DESCRIPTION;

private static final String USE_UNPOOLED = "es.use_unpooled_allocator";
Expand All @@ -54,6 +57,7 @@ public class NettyAllocator {
DESCRIPTION = "[name=netty_default, suggested_max_allocation_size="
+ ByteSizeValue.ofBytes(SUGGESTED_MAX_ALLOCATION_SIZE)
+ ", factors={es.unsafe.use_netty_default_allocator=true}]";
POOLED = true;
} else {
final long heapSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
final boolean g1gcEnabled = useG1GC();
Expand All @@ -68,7 +72,7 @@ public class NettyAllocator {
if (g1gcEnabled && g1gcRegionSizeIsKnown) {
// Suggested max allocation size 1/4 of region size. Guard against unknown edge cases
// where this value would be less than 256KB.
SUGGESTED_MAX_ALLOCATION_SIZE = Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024);
SUGGESTED_MAX_ALLOCATION_SIZE = Math.toIntExact(Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024));
} else {
SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
}
Expand All @@ -83,6 +87,7 @@ public class NettyAllocator {
+ ", heap_size="
+ heapSize
+ "}]";
POOLED = false;
} else {
int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena();
int pageSize;
Expand Down Expand Up @@ -130,7 +135,9 @@ public class NettyAllocator {
+ ", g1gc_region_size="
+ g1gcRegionSize
+ "}]";
POOLED = true;
}
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
allocator = new NoDirectBuffers(delegate);
}
if (Assertions.ENABLED) {
Expand All @@ -139,35 +146,58 @@ public class NettyAllocator {
ALLOCATOR = allocator;
}

RECYCLER = new Recycler<>() {
@Override
public Recycler.V<BytesRef> obtain() {
ByteBuf byteBuf = ALLOCATOR.heapBuffer(PageCacheRecycler.BYTE_PAGE_SIZE, PageCacheRecycler.BYTE_PAGE_SIZE);
assert byteBuf.hasArray();
BytesRef bytesRef = new BytesRef(byteBuf.array(), byteBuf.arrayOffset(), byteBuf.capacity());
return new Recycler.V<>() {
@Override
public BytesRef v() {
return bytesRef;
}
assert isPowerOfTwo(SUGGESTED_MAX_ALLOCATION_SIZE);
RECYCLER = new NettyPageRecycler(PageCacheRecycler.BYTE_PAGE_SIZE);
}

@Override
public boolean isRecycled() {
return true;
}
private static boolean isPowerOfTwo(int n) {
return n > 0 && (n & (n - 1)) == 0;
}

@Override
public void close() {
byteBuf.release();
}
};
}
private record NettyPageRecycler(int pageSize) implements VariableRecycler {

@Override
public int pageSize() {
return PageCacheRecycler.BYTE_PAGE_SIZE;
}
};
@Override
public Recycler<BytesRef> requestRecyclerForPageSize(int requestedPageSize) {
int providedPageSize = ALLOCATOR.calculateNewCapacity(
Math.min(SUGGESTED_MAX_ALLOCATION_SIZE, requestedPageSize),
SUGGESTED_MAX_ALLOCATION_SIZE
);
return new NettyPageRecycler(providedPageSize);
}

@Override
public boolean recyclerEnabled() {
return POOLED == false;
}

@Override
public V<BytesRef> obtain() {
ByteBuf byteBuf = ALLOCATOR.heapBuffer(pageSize, pageSize);
assert byteBuf.hasArray();
BytesRef bytesRef = new BytesRef(byteBuf.array(), byteBuf.arrayOffset(), byteBuf.capacity());
return new V<>() {
@Override
public BytesRef v() {
return bytesRef;
}

@Override
public boolean isRecycled() {
return true;
}

@Override
public void close() {
byteBuf.release();

}
};
}

@Override
public int pageSize() {
return pageSize;
}
}

@SuppressForbidden(
Expand All @@ -187,7 +217,7 @@ public static ByteBufAllocator getAllocator() {
return ALLOCATOR;
}

public static Recycler<BytesRef> getRecycler() {
public static VariableRecycler getRecycler() {
return RECYCLER;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ public static final IndexShard newIndexShard(
indexService.getThreadPool(),
indexService.getThreadPoolMergeExecutorService(),
indexService.getBigArrays(),
indexService.getBytesRecycler(),
null,
Collections.emptyList(),
Arrays.asList(listeners),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.recycler;

import org.apache.lucene.util.BytesRef;

public interface VariableRecycler extends Recycler<BytesRef> {

Recycler<BytesRef> requestRecyclerForPageSize(int pageSize);

boolean recyclerEnabled();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.util;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;

import java.util.function.IntFunction;

public class BytePageRecycler implements Recycler<BytesRef> {

private final VarPageRecycler varPageRecycler;
private final PageCacheRecycler pageCacheRecycler;

public BytePageRecycler(@Nullable VarPageRecycler varPageRecycler, PageCacheRecycler pageCacheRecycler) {
this.varPageRecycler = varPageRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}

public Recycler.V<BytesRef> getPage(int bytesRequested) {
if (varPageRecycler != null) {
return varPageRecycler.apply(bytesRequested);
} else {
return obtain();
}
}

@Override
public V<BytesRef> obtain() {
if (varPageRecycler != null) {
return varPageRecycler.obtain();
} else {
V<byte[]> v = pageCacheRecycler.bytePage(false);
return new Page(v.v(), 0, PageCacheRecycler.BYTE_PAGE_SIZE, v.isRecycled(), v);
}
}

@Override
public int pageSize() {
return PageCacheRecycler.BYTE_PAGE_SIZE;
}

public interface VarPageRecycler extends Recycler<BytesRef>, IntFunction<Page> {}

public static class Page implements V<BytesRef> {

private final Releasable releasable;
private final BytesRef bytesRef;
private final boolean isRecycled;

private Page(byte[] bytes, int offset, int length, boolean isRecycled, Releasable releasable) {
this.isRecycled = isRecycled;
this.bytesRef = new BytesRef(bytes, offset, length);
this.releasable = releasable;
}

@Override
public BytesRef v() {
return bytesRef;
}

@Override
public boolean isRecycled() {
return isRecycled;
}

@Override
public void close() {
releasable.close();
}
}
}
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.recycler.VariableRecycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -82,6 +83,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* IndexModule represents the central extension point for index level custom implementations like:
Expand Down Expand Up @@ -483,6 +485,7 @@ public IndexService newIndexService(
IndexService.ShardStoreDeleter shardStoreDeleter,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
Supplier<VariableRecycler> bytesRecycler,
ThreadPool threadPool,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
ScriptService scriptService,
Expand Down Expand Up @@ -537,6 +540,7 @@ public IndexService newIndexService(
engineFactory,
circuitBreakerService,
bigArrays,
bytesRecycler,
threadPool,
threadPoolMergeExecutorService,
scriptService,
Expand Down
Loading