From 44f89e522cff15b172f1b07daaff753efa360724 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 8 Oct 2025 10:56:30 -0600 Subject: [PATCH 1/9] WIP --- .../common/util/BytePageRecycler.java | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/common/util/BytePageRecycler.java diff --git a/server/src/main/java/org/elasticsearch/common/util/BytePageRecycler.java b/server/src/main/java/org/elasticsearch/common/util/BytePageRecycler.java new file mode 100644 index 0000000000000..bad8407bf96d7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/BytePageRecycler.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.util; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; + +import java.util.function.IntFunction; + +public class BytePageRecycler implements Recycler { + + private final VarPageRecycler varPageRecycler; + private final PageCacheRecycler pageCacheRecycler; + + public BytePageRecycler(@Nullable VarPageRecycler varPageRecycler, PageCacheRecycler pageCacheRecycler) { + this.varPageRecycler = varPageRecycler; + this.pageCacheRecycler = pageCacheRecycler; + } + + public Recycler.V getPage(int bytesRequested) { + if (varPageRecycler != null) { + return varPageRecycler.apply(bytesRequested); + } else { + return obtain(); + } + } + + @Override + public V obtain() { + if (varPageRecycler != null) { + return varPageRecycler.obtain(); + } else { + V v = pageCacheRecycler.bytePage(false); + return new Page(v.v(), 0, PageCacheRecycler.BYTE_PAGE_SIZE, v.isRecycled(), v); + } + } + + @Override + public int pageSize() { + return PageCacheRecycler.BYTE_PAGE_SIZE; + } + + public interface VarPageRecycler extends Recycler, IntFunction {} + + public static class Page implements V { + + private final Releasable releasable; + private final BytesRef bytesRef; + private final boolean isRecycled; + + private Page(byte[] bytes, int offset, int length, boolean isRecycled, Releasable releasable) { + this.isRecycled = isRecycled; + this.bytesRef = new BytesRef(bytes, offset, length); + this.releasable = releasable; + } + + @Override + public BytesRef v() { + return bytesRef; + } + + @Override + public boolean isRecycled() { + return isRecycled; + } + + @Override + public void close() { + releasable.close(); + } + } +} From 2a7a6f1b72213b689e1de5d13c9cb352fd2d2091 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 8 Oct 2025 12:47:35 -0600 Subject: [PATCH 2/9] Change --- .../transport/netty4/Netty4Transport.java | 5 +- .../transport/netty4/Netty4Utils.java | 4 +- .../transport/netty4/NettyAllocator.java | 88 ++++++++++++------- .../index/shard/IndexShardIT.java | 1 + .../common/recycler/VariableRecycler.java | 19 ++++ .../org/elasticsearch/index/IndexModule.java | 3 + .../org/elasticsearch/index/IndexService.java | 9 ++ .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/translog/Translog.java | 13 +-- .../index/translog/TranslogConfig.java | 25 +++--- .../index/translog/TranslogWriter.java | 14 +-- .../translog/TruncateTranslogAction.java | 4 +- .../elasticsearch/indices/IndicesService.java | 4 + .../indices/IndicesServiceBuilder.java | 8 ++ .../elasticsearch/node/NodeConstruction.java | 1 + .../transport/BytesRefRecycler.java | 13 ++- .../elasticsearch/transport/TcpTransport.java | 12 ++- .../elasticsearch/transport/Transport.java | 5 ++ .../transport/TransportService.java | 5 ++ .../elasticsearch/index/IndexModuleTests.java | 2 + .../index/engine/InternalEngineTests.java | 8 +- .../index/shard/RefreshListenersTests.java | 3 +- .../translog/TranslogDeletionPolicyTests.java | 4 +- .../index/translog/TranslogTests.java | 7 +- .../snapshots/SnapshotResiliencyTests.java | 1 + .../index/engine/EngineTestCase.java | 12 ++- .../index/shard/IndexShardTestCase.java | 2 + .../index/engine/FollowingEngineTests.java | 3 +- 28 files changed, 194 insertions(+), 85 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/recycler/VariableRecycler.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 14607f563f1c6..00ac011f0917c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -25,14 +25,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.ThreadWatchdog; -import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.PageCacheRecycler; @@ -113,7 +112,7 @@ public Netty4Transport( } @Override - protected Recycler createRecycler(Settings settings, PageCacheRecycler pageCacheRecycler) { + protected VariableRecycler createRecycler(Settings settings, PageCacheRecycler pageCacheRecycler) { return Netty4Utils.createRecycler(settings); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index 81b4fd3fbb9ee..4c6be1cb77e8a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -28,7 +28,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Booleans; @@ -182,7 +182,7 @@ public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) { return new HttpBody.ByteRefHttpBody(toReleasableBytesReference(buf)); } - public static Recycler createRecycler(Settings settings) { + public static VariableRecycler createRecycler(Settings settings) { // If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals // setting the processors. We must do it ourselves first just in case. setAvailableProcessors(EsExecutors.allocatedProcessors(settings)); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java index 267d8091b08ee..60c1186c41972 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.core.Assertions; @@ -37,9 +38,10 @@ public class NettyAllocator { private static final Logger logger = LogManager.getLogger(NettyAllocator.class); private static final AtomicBoolean descriptionLogged = new AtomicBoolean(false); - private static final long SUGGESTED_MAX_ALLOCATION_SIZE; + private static final int SUGGESTED_MAX_ALLOCATION_SIZE; + private static final boolean POOLED; private static final ByteBufAllocator ALLOCATOR; - private static final Recycler RECYCLER; + private static final VariableRecycler RECYCLER; private static final String DESCRIPTION; private static final String USE_UNPOOLED = "es.use_unpooled_allocator"; @@ -54,6 +56,7 @@ public class NettyAllocator { DESCRIPTION = "[name=netty_default, suggested_max_allocation_size=" + ByteSizeValue.ofBytes(SUGGESTED_MAX_ALLOCATION_SIZE) + ", factors={es.unsafe.use_netty_default_allocator=true}]"; + POOLED = true; } else { final long heapSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes(); final boolean g1gcEnabled = useG1GC(); @@ -68,7 +71,7 @@ public class NettyAllocator { if (g1gcEnabled && g1gcRegionSizeIsKnown) { // Suggested max allocation size 1/4 of region size. Guard against unknown edge cases // where this value would be less than 256KB. - SUGGESTED_MAX_ALLOCATION_SIZE = Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024); + SUGGESTED_MAX_ALLOCATION_SIZE = Math.toIntExact(Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024)); } else { SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024; } @@ -83,6 +86,7 @@ public class NettyAllocator { + ", heap_size=" + heapSize + "}]"; + POOLED = false; } else { int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena(); int pageSize; @@ -130,6 +134,7 @@ public class NettyAllocator { + ", g1gc_region_size=" + g1gcRegionSize + "}]"; + POOLED = true; } allocator = new NoDirectBuffers(delegate); } @@ -139,35 +144,58 @@ public class NettyAllocator { ALLOCATOR = allocator; } - RECYCLER = new Recycler<>() { - @Override - public Recycler.V obtain() { - ByteBuf byteBuf = ALLOCATOR.heapBuffer(PageCacheRecycler.BYTE_PAGE_SIZE, PageCacheRecycler.BYTE_PAGE_SIZE); - assert byteBuf.hasArray(); - BytesRef bytesRef = new BytesRef(byteBuf.array(), byteBuf.arrayOffset(), byteBuf.capacity()); - return new Recycler.V<>() { - @Override - public BytesRef v() { - return bytesRef; - } + assert isPowerOfTwo(SUGGESTED_MAX_ALLOCATION_SIZE); + RECYCLER = new NettyPageRecycler(PageCacheRecycler.BYTE_PAGE_SIZE); + } - @Override - public boolean isRecycled() { - return true; - } + private static boolean isPowerOfTwo(int n) { + return n > 0 && (n & (n - 1)) == 0; + } - @Override - public void close() { - byteBuf.release(); - } - }; - } + private record NettyPageRecycler(int pageSize) implements VariableRecycler { - @Override - public int pageSize() { - return PageCacheRecycler.BYTE_PAGE_SIZE; - } - }; + @Override + public Recycler requestRecyclerForPageSize(int requestedPageSize) { + int providedPageSize = ALLOCATOR.calculateNewCapacity( + Math.min(SUGGESTED_MAX_ALLOCATION_SIZE, requestedPageSize), + SUGGESTED_MAX_ALLOCATION_SIZE + ); + return new NettyPageRecycler(providedPageSize); + } + + @Override + public boolean recyclerEnabled() { + return POOLED == false; + } + + @Override + public V obtain() { + ByteBuf byteBuf = ALLOCATOR.heapBuffer(pageSize, pageSize); + assert byteBuf.hasArray(); + BytesRef bytesRef = new BytesRef(byteBuf.array(), byteBuf.arrayOffset(), byteBuf.capacity()); + return new V<>() { + @Override + public BytesRef v() { + return bytesRef; + } + + @Override + public boolean isRecycled() { + return true; + } + + @Override + public void close() { + byteBuf.release(); + + } + }; + } + + @Override + public int pageSize() { + return pageSize; + } } @SuppressForbidden( @@ -187,7 +215,7 @@ public static ByteBufAllocator getAllocator() { return ALLOCATOR; } - public static Recycler getRecycler() { + public static VariableRecycler getRecycler() { return RECYCLER; } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index f51e7209314ba..ac0b8cde5f10b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -800,6 +800,7 @@ public static final IndexShard newIndexShard( indexService.getThreadPool(), indexService.getThreadPoolMergeExecutorService(), indexService.getBigArrays(), + indexService.getBytesRecycler(), null, Collections.emptyList(), Arrays.asList(listeners), diff --git a/server/src/main/java/org/elasticsearch/common/recycler/VariableRecycler.java b/server/src/main/java/org/elasticsearch/common/recycler/VariableRecycler.java new file mode 100644 index 0000000000000..be9f24cebda22 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/recycler/VariableRecycler.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.recycler; + +import org.apache.lucene.util.BytesRef; + +public interface VariableRecycler extends Recycler { + + Recycler requestRecyclerForPageSize(int pageSize); + + boolean recyclerEnabled(); +} diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 42ab9ae362509..ee64d6e6956bb 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -483,6 +484,7 @@ public IndexService newIndexService( IndexService.ShardStoreDeleter shardStoreDeleter, CircuitBreakerService circuitBreakerService, BigArrays bigArrays, + VariableRecycler bytesRecycler, ThreadPool threadPool, ThreadPoolMergeExecutorService threadPoolMergeExecutorService, ScriptService scriptService, @@ -537,6 +539,7 @@ public IndexService newIndexService( engineFactory, circuitBreakerService, bigArrays, + bytesRecycler, threadPool, threadPoolMergeExecutorService, scriptService, diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 9af701fa81642..9a676b997dce7 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -162,6 +163,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust @Nullable private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final BigArrays bigArrays; + private final VariableRecycler bytesRecycler; private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; @@ -187,6 +189,7 @@ public IndexService( EngineFactory engineFactory, CircuitBreakerService circuitBreakerService, BigArrays bigArrays, + VariableRecycler bytesRecycler, ThreadPool threadPool, ThreadPoolMergeExecutorService threadPoolMergeExecutorService, ScriptService scriptService, @@ -274,6 +277,7 @@ public IndexService( this.shardStoreDeleter = shardStoreDeleter; this.indexFoldersDeletionListener = indexFoldersDeletionListener; this.bigArrays = bigArrays; + this.bytesRecycler = bytesRecycler; this.threadPool = threadPool; this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; this.scriptService = scriptService; @@ -581,6 +585,7 @@ public synchronized IndexShard createShard( threadPool, threadPoolMergeExecutorService, bigArrays, + bytesRecycler, engineWarmer, searchOperationListeners, indexingOperationListeners, @@ -861,6 +866,10 @@ public BigArrays getBigArrays() { return bigArrays; } + public VariableRecycler getBytesRecycler() { + return bytesRecycler; + } + /** * The {@link ScriptService} to use for this index. */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 451331e6e27c0..ecfcf9fdfe164 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -53,6 +53,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.CollectionUtils; @@ -336,6 +337,7 @@ public IndexShard( final ThreadPool threadPool, final ThreadPoolMergeExecutorService threadPoolMergeExecutorService, final BigArrays bigArrays, + final VariableRecycler bytesRecycler, final Engine.Warmer warmer, final List searchOperationListener, final List listeners, @@ -393,7 +395,7 @@ public IndexShard( logger.debug("state: [CREATED]"); this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); - this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); + this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bytesRecycler); final String aId = shardRouting.allocationId().getId(); final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id()); this.pendingPrimaryTerm = primaryTerm; diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index a5088b5896c80..2b3c3d162fd29 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -30,7 +30,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; @@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.lookup.Source; +import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.xcontent.XContentParserConfiguration; import java.io.Closeable; @@ -124,7 +125,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); - private final BigArrays bigArrays; + private final VariableRecycler bytesRecycler; private final DiskIoBufferPool diskIoBufferPool; protected final Lock readLock; protected final Lock writeLock; @@ -180,7 +181,7 @@ public Translog( this.operationAsserter = operationAsserter; this.deletionPolicy = deletionPolicy; this.translogUUID = translogUUID; - this.bigArrays = config.getBigArrays(); + this.bytesRecycler = config.getBytesRecycler(); this.diskIoBufferPool = config.getDiskIoBufferPool(); var rwl = new ReentrantReadWriteLock(); this.readLock = rwl.readLock(); @@ -603,7 +604,7 @@ TranslogWriter createWriter( primaryTermSupplier.getAsLong(), tragedy, persistedSequenceNumberConsumer, - bigArrays, + bytesRecycler, diskIoBufferPool, operationListener, operationAsserter, @@ -623,7 +624,7 @@ TranslogWriter createWriter( * @throws IOException if adding the operation to the translog resulted in an I/O exception */ public Location add(final Operation operation) throws IOException { - try (RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler())) { + try (RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(bytesRecycler)) { writeHeaderWithSize(out, operation); final BytesReference header = out.bytes(); Serialized serialized = Serialized.create( @@ -2162,7 +2163,7 @@ public static String createEmptyTranslog( seqNo -> { throw new UnsupportedOperationException(); }, - BigArrays.NON_RECYCLING_INSTANCE, + BytesRefRecycler.NON_RECYCLING_INSTANCE, DiskIoBufferPool.INSTANCE, TranslogConfig.NOOP_OPERATION_LISTENER, TranslogOperationAsserter.DEFAULT, diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index 4af0c0ad58ab4..55978d3debcfa 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -10,9 +10,9 @@ package org.elasticsearch.index.translog; import org.elasticsearch.common.io.DiskIoBufferPool; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.shard.ShardId; @@ -33,7 +33,7 @@ public final class TranslogConfig { private final ShardId shardId; private final Path translogPath; private final IndexSettings indexSettings; - private final BigArrays bigArrays; + private final VariableRecycler bytesRecycler; private final ByteSizeValue bufferSize; private final DiskIoBufferPool diskIoBufferPool; private final OperationListener operationListener; @@ -44,14 +44,14 @@ public final class TranslogConfig { * @param shardId the shard ID this translog belongs to * @param translogPath the path to use for the transaction log files * @param indexSettings the index settings used to set internal variables - * @param bigArrays a bigArrays instance used for temporarily allocating write operations + * @param bytesRecycler a bytesRecycler instance used for temporarily allocating write operations */ - public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) { + public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, VariableRecycler bytesRecycler) { this( shardId, translogPath, indexSettings, - bigArrays, + bytesRecycler, DEFAULT_BUFFER_SIZE, DiskIoBufferPool.INSTANCE, NOOP_OPERATION_LISTENER, @@ -63,19 +63,19 @@ public TranslogConfig( ShardId shardId, Path translogPath, IndexSettings indexSettings, - BigArrays bigArrays, + VariableRecycler bytesRecycler, ByteSizeValue bufferSize, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener ) { - this(shardId, translogPath, indexSettings, bigArrays, bufferSize, diskIoBufferPool, operationListener, true); + this(shardId, translogPath, indexSettings, bytesRecycler, bufferSize, diskIoBufferPool, operationListener, true); } public TranslogConfig( ShardId shardId, Path translogPath, IndexSettings indexSettings, - BigArrays bigArrays, + VariableRecycler bytesRecycler, ByteSizeValue bufferSize, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener, @@ -85,7 +85,7 @@ public TranslogConfig( this.indexSettings = indexSettings; this.shardId = shardId; this.translogPath = translogPath; - this.bigArrays = bigArrays; + this.bytesRecycler = bytesRecycler; this.diskIoBufferPool = diskIoBufferPool; this.operationListener = operationListener; this.fsync = fsync; @@ -105,11 +105,8 @@ public ShardId getShardId() { return shardId; } - /** - * Returns a BigArrays instance for this engine - */ - public BigArrays getBigArrays() { - return bigArrays; + public VariableRecycler getBytesRecycler() { + return bytesRecycler; } /** diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 7591be5a87aca..66e9e314980fd 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -18,8 +18,8 @@ import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.IOUtils; @@ -51,7 +51,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final ShardId shardId; private final FileChannel checkpointChannel; private final Path checkpointPath; - private final BigArrays bigArrays; + private final VariableRecycler bytesRecycler; // the last checkpoint that was written when the translog was last synced private volatile Checkpoint lastSyncedCheckpoint; /* the number of translog operations written to this file */ @@ -107,7 +107,7 @@ private TranslogWriter( TranslogHeader header, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, - BigArrays bigArrays, + VariableRecycler bytesRecycler, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener, TranslogOperationAsserter operationAsserter, @@ -134,7 +134,7 @@ private TranslogWriter( assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer; - this.bigArrays = bigArrays; + this.bytesRecycler = bytesRecycler; this.diskIoBufferPool = diskIoBufferPool; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.tragedy = tragedy; @@ -158,7 +158,7 @@ public static TranslogWriter create( long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, - BigArrays bigArrays, + VariableRecycler bytesRecycler, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener, TranslogOperationAsserter operationAsserter, @@ -203,7 +203,7 @@ public static TranslogWriter create( header, tragedy, persistedSequenceNumberConsumer, - bigArrays, + bytesRecycler, diskIoBufferPool, operationListener, operationAsserter, @@ -245,7 +245,7 @@ public Translog.Location add(final Translog.Serialized operation, final long seq synchronized (this) { ensureOpen(); if (buffer == null) { - buffer = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler()); + buffer = new RecyclerBytesStreamOutput(bytesRecycler); } assert bufferedBytes == buffer.size(); final long offset = totalOffset; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index d99d42cd2f10b..5148e303f73b4 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -20,7 +20,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; @@ -29,6 +28,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.transport.BytesRefRecycler; import java.io.IOException; import java.nio.channels.FileChannel; @@ -161,7 +161,7 @@ private static boolean isTranslogClean(ShardPath shardPath, ClusterState cluster shardPath.getShardId(), translogPath, indexSettings, - BigArrays.NON_RECYCLING_INSTANCE + BytesRefRecycler.NON_RECYCLING_INSTANCE ); long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id()); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 551217aeb7bea..a0167fae5d36e 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -59,6 +59,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -243,6 +244,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final CircuitBreakerService circuitBreakerService; private final BigArrays bigArrays; + private final VariableRecycler bytesRecycler; private final ScriptService scriptService; private final ClusterService clusterService; private final ProjectResolver projectResolver; @@ -320,6 +322,7 @@ protected void doStart() { this.indexScopedSettings = builder.indexScopedSettings; this.circuitBreakerService = builder.circuitBreakerService; this.bigArrays = builder.bigArrays; + this.bytesRecycler = builder.bytesRecycler; this.scriptService = builder.scriptService; this.clusterService = builder.clusterService; this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( @@ -822,6 +825,7 @@ private synchronized IndexService createIndexService( this, circuitBreakerService, bigArrays, + bytesRecycler, threadPool, threadPoolMergeExecutorService, scriptService, diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java index 3b7f4d24869f2..dc4c2546d10f4 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -65,6 +66,7 @@ public class IndicesServiceBuilder { IndexScopedSettings indexScopedSettings; CircuitBreakerService circuitBreakerService; BigArrays bigArrays; + VariableRecycler bytesRecycler; ScriptService scriptService; ClusterService clusterService; ProjectResolver projectResolver; @@ -166,6 +168,11 @@ public IndicesServiceBuilder bigArrays(BigArrays bigArrays) { return this; } + public IndicesServiceBuilder bytesRecycler(VariableRecycler bytesRecycler) { + this.bytesRecycler = bytesRecycler; + return this; + } + public IndicesServiceBuilder scriptService(ScriptService scriptService) { this.scriptService = scriptService; return this; @@ -240,6 +247,7 @@ public IndicesService build() { Objects.requireNonNull(indexScopedSettings); Objects.requireNonNull(circuitBreakerService); Objects.requireNonNull(bigArrays); + Objects.requireNonNull(bytesRecycler); Objects.requireNonNull(scriptService); Objects.requireNonNull(clusterService); Objects.requireNonNull(projectResolver); diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 201bf648439c5..d190d5dd87767 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -939,6 +939,7 @@ public Map queryFields() { .indexScopedSettings(settingsModule.getIndexScopedSettings()) .circuitBreakerService(circuitBreakerService) .bigArrays(bigArrays) + .bytesRecycler(bigArrays.bytesRefRecycler()) .scriptService(scriptService) .clusterService(clusterService) .projectResolver(projectResolver) diff --git a/server/src/main/java/org/elasticsearch/transport/BytesRefRecycler.java b/server/src/main/java/org/elasticsearch/transport/BytesRefRecycler.java index c939604488311..f578f9e2c3a3c 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesRefRecycler.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesRefRecycler.java @@ -11,9 +11,10 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.util.PageCacheRecycler; -public class BytesRefRecycler implements Recycler { +public class BytesRefRecycler implements VariableRecycler { public static final BytesRefRecycler NON_RECYCLING_INSTANCE = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE); @@ -23,6 +24,16 @@ public BytesRefRecycler(PageCacheRecycler recycler) { this.recycler = recycler; } + @Override + public Recycler requestRecyclerForPageSize(int pageSize) { + return this; + } + + @Override + public boolean recyclerEnabled() { + return true; + } + @Override public Recycler.V obtain() { Recycler.V v = recycler.bytePage(false); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 24570c544bdea..e900e655543c2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; @@ -32,7 +31,7 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; -import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -111,7 +110,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final Settings settings; protected final ThreadPool threadPool; - protected final Recycler recycler; + protected final VariableRecycler recycler; protected final NetworkService networkService; protected final Set profileSettingsSet; protected final boolean rstOnClose; @@ -357,7 +356,7 @@ protected static ConnectionProfile maybeOverrideConnectionProfile(ConnectionProf return connectionProfile; } - protected Recycler createRecycler(Settings settings, PageCacheRecycler pageCacheRecycler) { + protected VariableRecycler createRecycler(Settings settings, PageCacheRecycler pageCacheRecycler) { return new BytesRefRecycler(pageCacheRecycler); } @@ -991,6 +990,11 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { return new RecyclerBytesStreamOutput(recycler); } + @Override + public VariableRecycler variableRecycler() { + return recycler; + } + /** * Ensures this transport is still started / open * diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index 13b2752c929bb..b70092e09a589 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.Maps; @@ -92,6 +93,10 @@ default RecyclerBytesStreamOutput newNetworkBytesStream() { return new RecyclerBytesStreamOutput(NON_RECYCLING_INSTANCE); } + default VariableRecycler variableRecycler() { + return NON_RECYCLING_INSTANCE; + } + /** * A unidirectional connection to a {@link DiscoveryNode} */ diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 785a56a41ddbe..83e1a5f3ba62b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -655,6 +656,10 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { return transport.newNetworkBytesStream(); } + public VariableRecycler variableRecycler() { + return transport.variableRecycler(); + } + static class HandshakeRequest extends AbstractTransportRequest { public static final HandshakeRequest INSTANCE = new HandshakeRequest(); diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 088c748bde5f6..314a7b7adfd72 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -102,6 +102,7 @@ import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesRefRecycler; import org.hamcrest.Matchers; import java.io.Closeable; @@ -233,6 +234,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { deleter, circuitBreakerService, bigArrays, + BytesRefRecycler.NON_RECYCLING_INSTANCE, threadPool, threadPoolMergeExecutorService, scriptService, diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fa951d6473153..7ce9ab365221a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -91,7 +91,6 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.CheckedRunnable; @@ -138,6 +137,7 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.xcontent.XContentType; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -3585,7 +3585,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final Path badTranslogLog = createTempDir(); final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); Translog translog = new Translog( - new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), + new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BytesRefRecycler.NON_RECYCLING_INSTANCE), badUUID, new TranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, @@ -3603,7 +3603,7 @@ public void testRecoverFromForeignTranslog() throws IOException { shardId, translog.location(), config.getIndexSettings(), - BigArrays.NON_RECYCLING_INSTANCE + BytesRefRecycler.NON_RECYCLING_INSTANCE ); EngineConfig brokenConfig = new EngineConfig( @@ -7214,7 +7214,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getTranslogConfig().getShardId(), createTempDir(), config.getTranslogConfig().getIndexSettings(), - config.getTranslogConfig().getBigArrays() + config.getTranslogConfig().getBytesRecycler() ); EngineConfig configWithWarmer = new EngineConfig( config.getShardId(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index bf6a4f4ec2d56..dae95675a4ba0 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.xcontent.XContentType; import org.junit.After; import org.junit.Before; @@ -131,7 +132,7 @@ public void setupListeners() throws Exception { shardId, createTempDir("translog"), indexSettings, - BigArrays.NON_RECYCLING_INSTANCE + BytesRefRecycler.NON_RECYCLING_INSTANCE ); Engine.EventListener eventListener = new Engine.EventListener() { @Override diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 5b3f964813905..62188ab1afce6 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -12,13 +12,13 @@ import org.apache.lucene.store.ByteArrayDataOutput; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.engine.TranslogOperationAsserter; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.BytesRefRecycler; import org.mockito.Mockito; import java.io.IOException; @@ -93,7 +93,7 @@ private Tuple, TranslogWriter> createReadersAndWriter() thr randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, - BigArrays.NON_RECYCLING_INSTANCE, + BytesRefRecycler.NON_RECYCLING_INSTANCE, TranslogTests.RANDOMIZING_IO_BUFFERS, TranslogConfig.NOOP_OPERATION_LISTENER, TranslogOperationAsserter.DEFAULT, diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 62e609c5b9925..2ea33409e427e 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -124,7 +124,6 @@ import java.util.stream.LongStream; import java.util.stream.Stream; -import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.elasticsearch.index.translog.TranslogOperationsUtils.indexOp; import static org.hamcrest.CoreMatchers.hasItem; @@ -301,7 +300,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting shardId, path, indexSettings, - NON_RECYCLING_INSTANCE, + BytesRefRecycler.NON_RECYCLING_INSTANCE, bufferSize, randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS, Objects.requireNonNullElse(listener, (d, s, l) -> {}), @@ -1398,7 +1397,7 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { temp.getShardId(), temp.getTranslogPath(), temp.getIndexSettings(), - temp.getBigArrays(), + temp.getBytesRecycler(), ByteSizeValue.of(1, ByteSizeUnit.KB), randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS, TranslogConfig.NOOP_OPERATION_LISTENER, @@ -4086,7 +4085,7 @@ public void testDisabledFsync() throws IOException { shardId, translogDir, IndexSettingsModule.newIndexSettings(shardId.getIndex(), Settings.EMPTY), - NON_RECYCLING_INSTANCE, + BytesRefRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.of(1, ByteSizeUnit.KB), randomBoolean() ? DiskIoBufferPool.INSTANCE : RANDOMIZING_IO_BUFFERS, TranslogConfig.NOOP_OPERATION_LISTENER, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d6a57ba4587c5..39e554a8a52aa 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2488,6 +2488,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { .indexScopedSettings(indexScopedSettings) .circuitBreakerService(new NoneCircuitBreakerService()) .bigArrays(bigArrays) + .bytesRecycler(bigArrays.bytesRefRecycler()) .scriptService(scriptService) .clusterService(clusterService) .projectResolver(DefaultProjectResolver.INSTANCE) diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index e552bb133add4..fd4817e370f61 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -109,6 +109,7 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -518,7 +519,7 @@ protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOExc } protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BytesRefRecycler.NON_RECYCLING_INSTANCE); String translogUUID = Translog.createEmptyTranslog( translogPath, SequenceNumbers.NO_OPS_PERFORMED, @@ -838,7 +839,12 @@ public EngineConfig config( final @Nullable Function indexDeletionPolicyWrapper ) { final IndexWriterConfig iwc = newIndexWriterConfig(); - final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + final TranslogConfig translogConfig = new TranslogConfig( + shardId, + translogPath, + indexSettings, + BytesRefRecycler.NON_RECYCLING_INSTANCE + ); final Engine.EventListener eventListener = new Engine.EventListener() { }; // we don't need to notify anybody in this test final List extRefreshListenerList = externalRefreshListener == null @@ -912,7 +918,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .build() ); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BytesRefRecycler.NON_RECYCLING_INSTANCE); return new EngineConfig( config.getShardId(), config.getThreadPool(), diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 394134c978c79..28541f40226d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -94,6 +94,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.xcontent.XContentType; import java.io.IOException; @@ -660,6 +661,7 @@ protected IndexShard newShard( threadPool, threadPoolMergeExecutorService, BigArrays.NON_RECYCLING_INSTANCE, + BytesRefRecycler.NON_RECYCLING_INSTANCE, warmer, soListener, Arrays.asList(listeners), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 93b3a00c1019a..0387ddfc57ad9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -58,6 +58,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; @@ -249,7 +250,7 @@ private EngineConfig engineConfig( shardIdValue, translogPath, indexSettings, - BigArrays.NON_RECYCLING_INSTANCE + BytesRefRecycler.NON_RECYCLING_INSTANCE ); final MapperService mapperService = EngineTestCase.createMapperService(); return new EngineConfig( From 7c7201621a1b3c2bf314359799ec5aca9ee49cdd Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Oct 2025 12:40:11 -0600 Subject: [PATCH 3/9] Change --- .../main/java/org/elasticsearch/index/IndexModule.java | 3 ++- .../main/java/org/elasticsearch/index/IndexService.java | 8 ++++---- .../java/org/elasticsearch/index/translog/Translog.java | 3 +++ .../org/elasticsearch/index/translog/TranslogWriter.java | 2 +- .../java/org/elasticsearch/indices/IndicesService.java | 3 ++- .../org/elasticsearch/indices/IndicesServiceBuilder.java | 5 +++-- .../java/org/elasticsearch/node/NodeConstruction.java | 6 +++++- .../java/org/elasticsearch/index/IndexModuleTests.java | 2 +- .../elasticsearch/snapshots/SnapshotResiliencyTests.java | 2 +- 9 files changed, 22 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index ee64d6e6956bb..1bac60bdcb8a9 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -83,6 +83,7 @@ import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; /** * IndexModule represents the central extension point for index level custom implementations like: @@ -484,7 +485,7 @@ public IndexService newIndexService( IndexService.ShardStoreDeleter shardStoreDeleter, CircuitBreakerService circuitBreakerService, BigArrays bigArrays, - VariableRecycler bytesRecycler, + Supplier bytesRecycler, ThreadPool threadPool, ThreadPoolMergeExecutorService threadPoolMergeExecutorService, ScriptService scriptService, diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 9a676b997dce7..286f26f0ae950 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -163,7 +163,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust @Nullable private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final BigArrays bigArrays; - private final VariableRecycler bytesRecycler; + private final Supplier bytesRecycler; private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; @@ -189,7 +189,7 @@ public IndexService( EngineFactory engineFactory, CircuitBreakerService circuitBreakerService, BigArrays bigArrays, - VariableRecycler bytesRecycler, + Supplier bytesRecycler, ThreadPool threadPool, ThreadPoolMergeExecutorService threadPoolMergeExecutorService, ScriptService scriptService, @@ -585,7 +585,7 @@ public synchronized IndexShard createShard( threadPool, threadPoolMergeExecutorService, bigArrays, - bytesRecycler, + bytesRecycler.get(), engineWarmer, searchOperationListeners, indexingOperationListeners, @@ -867,7 +867,7 @@ public BigArrays getBigArrays() { } public VariableRecycler getBytesRecycler() { - return bytesRecycler; + return bytesRecycler.get(); } /** diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 2b3c3d162fd29..762fdd66b2815 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.IOUtils; @@ -126,6 +127,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); private final VariableRecycler bytesRecycler; + private final Recycler headerRecycler; private final DiskIoBufferPool diskIoBufferPool; protected final Lock readLock; protected final Lock writeLock; @@ -182,6 +184,7 @@ public Translog( this.deletionPolicy = deletionPolicy; this.translogUUID = translogUUID; this.bytesRecycler = config.getBytesRecycler(); + this.headerRecycler = bytesRecycler.requestRecyclerForPageSize(2048); this.diskIoBufferPool = config.getDiskIoBufferPool(); var rwl = new ReentrantReadWriteLock(); this.readLock = rwl.readLock(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 66e9e314980fd..2a2796d89e101 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -245,7 +245,7 @@ public Translog.Location add(final Translog.Serialized operation, final long seq synchronized (this) { ensureOpen(); if (buffer == null) { - buffer = new RecyclerBytesStreamOutput(bytesRecycler); + buffer = new RecyclerBytesStreamOutput(bytesRecycler.requestRecyclerForPageSize(256 * 1024)); } assert bufferedBytes == buffer.size(); final long offset = totalOffset; diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index a0167fae5d36e..b941ed92af32c 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -188,6 +188,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -244,7 +245,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final CircuitBreakerService circuitBreakerService; private final BigArrays bigArrays; - private final VariableRecycler bytesRecycler; + private final Supplier bytesRecycler; private final ScriptService scriptService; private final ClusterService clusterService; private final ProjectResolver projectResolver; diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java index dc4c2546d10f4..65b146f92b504 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java @@ -51,6 +51,7 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; public class IndicesServiceBuilder { @@ -66,7 +67,7 @@ public class IndicesServiceBuilder { IndexScopedSettings indexScopedSettings; CircuitBreakerService circuitBreakerService; BigArrays bigArrays; - VariableRecycler bytesRecycler; + Supplier bytesRecycler; ScriptService scriptService; ClusterService clusterService; ProjectResolver projectResolver; @@ -168,7 +169,7 @@ public IndicesServiceBuilder bigArrays(BigArrays bigArrays) { return this; } - public IndicesServiceBuilder bytesRecycler(VariableRecycler bytesRecycler) { + public IndicesServiceBuilder bytesRecycler(Supplier bytesRecycler) { this.bytesRecycler = bytesRecycler; return this; } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index d190d5dd87767..02394d091fb39 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -73,6 +73,7 @@ import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.recycler.VariableRecycler; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ConsistentSettingsService; import org.elasticsearch.common.settings.Setting; @@ -249,6 +250,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -760,6 +762,7 @@ private void construct( ); PageCacheRecycler pageCacheRecycler = serviceProvider.newPageCacheRecycler(pluginsService, settings); BigArrays bigArrays = serviceProvider.newBigArrays(pluginsService, pageCacheRecycler, circuitBreakerService); + final AtomicReference transportRecycler = new AtomicReference<>(); final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); final SnapshotMetrics snapshotMetrics = new SnapshotMetrics(telemetryProvider.getMeterRegistry()); @@ -939,7 +942,7 @@ public Map queryFields() { .indexScopedSettings(settingsModule.getIndexScopedSettings()) .circuitBreakerService(circuitBreakerService) .bigArrays(bigArrays) - .bytesRecycler(bigArrays.bytesRefRecycler()) + .bytesRecycler(transportRecycler::get) .scriptService(scriptService) .clusterService(clusterService) .projectResolver(projectResolver) @@ -1160,6 +1163,7 @@ public Map queryFields() { linkedProjectConfigService, projectResolver ); + transportRecycler.set(transportService.variableRecycler()); final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry()); final SearchTransportService searchTransportService = new SearchTransportService( transportService, diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 314a7b7adfd72..90b1a8da7865f 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -234,7 +234,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { deleter, circuitBreakerService, bigArrays, - BytesRefRecycler.NON_RECYCLING_INSTANCE, + () -> BytesRefRecycler.NON_RECYCLING_INSTANCE, threadPool, threadPoolMergeExecutorService, scriptService, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 39e554a8a52aa..1aaf0d511b094 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2488,7 +2488,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { .indexScopedSettings(indexScopedSettings) .circuitBreakerService(new NoneCircuitBreakerService()) .bigArrays(bigArrays) - .bytesRecycler(bigArrays.bytesRefRecycler()) + .bytesRecycler(bigArrays::bytesRefRecycler) .scriptService(scriptService) .clusterService(clusterService) .projectResolver(DefaultProjectResolver.INSTANCE) From 549e308cb0e660a3836087c34e49327642ae7215 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Oct 2025 13:37:15 -0600 Subject: [PATCH 4/9] Change --- distribution/src/config/jvm.options | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distribution/src/config/jvm.options b/distribution/src/config/jvm.options index f4cc3b1bf6191..5484ea15c9c56 100644 --- a/distribution/src/config/jvm.options +++ b/distribution/src/config/jvm.options @@ -54,6 +54,8 @@ ## JVM temporary directory -Djava.io.tmpdir=${ES_TMPDIR} +-Dio.netty.leakDetection.level=disabled + # Leverages accelerated vector hardware instructions; removing this may # result in less optimal vector performance 20-:--add-modules=jdk.incubator.vector From 84eab2c31a18149eb3fa84a572caf277a1d3165d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Oct 2025 14:16:33 -0600 Subject: [PATCH 5/9] disable --- .../org/elasticsearch/transport/netty4/NettyAllocator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java index 60c1186c41972..536706f433373 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java @@ -18,6 +18,8 @@ import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.ResourceLeakDetector; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; @@ -136,6 +138,7 @@ public class NettyAllocator { + "}]"; POOLED = true; } + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); allocator = new NoDirectBuffers(delegate); } if (Assertions.ENABLED) { From 2036502f977895670d472a1a6cdf6a130f821764 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 9 Oct 2025 20:24:42 +0000 Subject: [PATCH 6/9] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/transport/netty4/NettyAllocator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java index 536706f433373..8c9e6e58a50eb 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java @@ -17,7 +17,6 @@ import io.netty.channel.Channel; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; - import io.netty.util.ResourceLeakDetector; import org.apache.logging.log4j.LogManager; From be24166768b4b7d4ef2dd1d712084b068f3b2047 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Oct 2025 14:51:44 -0600 Subject: [PATCH 7/9] ifd --- .../index/translog/SimplePool.java | 59 +++++++++++++++++++ .../index/translog/Translog.java | 5 +- 2 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/translog/SimplePool.java diff --git a/server/src/main/java/org/elasticsearch/index/translog/SimplePool.java b/server/src/main/java/org/elasticsearch/index/translog/SimplePool.java new file mode 100644 index 0000000000000..7951c67d842f7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/translog/SimplePool.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class SimplePool implements Recycler { + + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + public SimplePool(Settings settings) { + int size = EsExecutors.allocatedProcessors(settings); + for (int i = 0; i < size; i++) { + queue.add(new BytesRef(new byte[256])); + } + } + + @Override + public V obtain() { + BytesRef ref = queue.poll(); + final BytesRef finalRef = ref == null ? new BytesRef(new byte[256]) : ref; + final boolean pooled = ref == finalRef; + return new V<>() { + @Override + public BytesRef v() { + return finalRef; + } + + @Override + public boolean isRecycled() { + return pooled; + } + + @Override + public void close() { + if (pooled) { + queue.add(finalRef); + } + } + }; + } + + @Override + public int pageSize() { + return 256; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 762fdd66b2815..abd3d9c56d090 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -184,7 +184,8 @@ public Translog( this.deletionPolicy = deletionPolicy; this.translogUUID = translogUUID; this.bytesRecycler = config.getBytesRecycler(); - this.headerRecycler = bytesRecycler.requestRecyclerForPageSize(2048); + this.headerRecycler = bytesRecycler.requestRecyclerForPageSize(512); +// this.headerRecycler = new SimplePool(config.getIndexSettings().getSettings()); this.diskIoBufferPool = config.getDiskIoBufferPool(); var rwl = new ReentrantReadWriteLock(); this.readLock = rwl.readLock(); @@ -627,7 +628,7 @@ TranslogWriter createWriter( * @throws IOException if adding the operation to the translog resulted in an I/O exception */ public Location add(final Operation operation) throws IOException { - try (RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(bytesRecycler)) { + try (RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(headerRecycler)) { writeHeaderWithSize(out, operation); final BytesReference header = out.bytes(); Serialized serialized = Serialized.create( From 975ce75490910908cf6fca919d2219883cb11556 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Oct 2025 14:59:22 -0600 Subject: [PATCH 8/9] Change --- .../main/java/org/elasticsearch/index/translog/Translog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index abd3d9c56d090..adfe1daedce55 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -185,7 +185,7 @@ public Translog( this.translogUUID = translogUUID; this.bytesRecycler = config.getBytesRecycler(); this.headerRecycler = bytesRecycler.requestRecyclerForPageSize(512); -// this.headerRecycler = new SimplePool(config.getIndexSettings().getSettings()); + // this.headerRecycler = new SimplePool(config.getIndexSettings().getSettings()); this.diskIoBufferPool = config.getDiskIoBufferPool(); var rwl = new ReentrantReadWriteLock(); this.readLock = rwl.readLock(); From 816ce7729576cca67ae0501376238c58c3797f12 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 9 Oct 2025 15:15:57 -0600 Subject: [PATCH 9/9] Change --- .../main/java/org/elasticsearch/index/translog/Translog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index adfe1daedce55..ac5e2cba9e273 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -184,8 +184,8 @@ public Translog( this.deletionPolicy = deletionPolicy; this.translogUUID = translogUUID; this.bytesRecycler = config.getBytesRecycler(); - this.headerRecycler = bytesRecycler.requestRecyclerForPageSize(512); - // this.headerRecycler = new SimplePool(config.getIndexSettings().getSettings()); + // this.headerRecycler = bytesRecycler.requestRecyclerForPageSize(512); + this.headerRecycler = new SimplePool(config.getIndexSettings().getSettings()); this.diskIoBufferPool = config.getDiskIoBufferPool(); var rwl = new ReentrantReadWriteLock(); this.readLock = rwl.readLock();