From 5d0e727370836b695a869131ea28a4367200699b Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 2 May 2025 15:10:00 +0200 Subject: [PATCH 1/3] Speed up translog writes by moving buffers where possible This serves two purposes: Save allocation of pages under the lock for low write-rate translogs and also make copying large documents a little less likely. --- .../stream/ReleasableBytesStreamOutput.java | 7 ++- .../index/translog/Translog.java | 14 +++--- .../index/translog/TranslogWriter.java | 43 +++++++++++-------- .../translog/TranslogDeletionPolicyTests.java | 7 +-- .../index/translog/TranslogTests.java | 35 +++++++++------ 5 files changed, 63 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java index f9f66c9b3eb9f..e0af4cbd02f8d 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; /** * An bytes stream output that allows providing a {@link BigArrays} instance @@ -36,7 +35,11 @@ public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { @Override public void close() { - Releasables.close(bytes); + var bytes = this.bytes; + if (bytes != null) { + this.bytes = null; + bytes.close(); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index b1a203616b120..f7c56d1e50c07 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -587,7 +587,6 @@ TranslogWriter createWriter( primaryTermSupplier.getAsLong(), tragedy, persistedSequenceNumberConsumer, - bigArrays, diskIoBufferPool, operationListener, operationAsserter, @@ -607,9 +606,9 @@ TranslogWriter createWriter( * @throws IOException if adding the operation to the translog resulted in an I/O exception */ public Location add(final Operation operation) throws IOException { - try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) { + ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); + try { writeOperationWithSize(out, operation); - final BytesReference bytes = out.bytes(); readLock.lock(); try { ensureOpen(); @@ -630,7 +629,9 @@ public Location add(final Operation operation) throws IOException { + "]" ); } - return current.add(bytes, operation.seqNo()); + var res = current.add(out, operation.seqNo()); + out = null; + return res; } finally { readLock.unlock(); } @@ -640,6 +641,10 @@ public Location add(final Operation operation) throws IOException { } catch (final Exception ex) { closeOnTragicEvent(ex); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex); + } finally { + if (out != null) { + out.close(); + } } } @@ -1964,7 +1969,6 @@ public static String createEmptyTranslog( seqNo -> { throw new UnsupportedOperationException(); }, - BigArrays.NON_RECYCLING_INSTANCE, DiskIoBufferPool.INSTANCE, TranslogConfig.NOOP_OPERATION_LISTENER, TranslogOperationAsserter.DEFAULT, diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 36b6709661017..2c19c4430dfbf 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.IOUtils; @@ -51,7 +50,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final ShardId shardId; private final FileChannel checkpointChannel; private final Path checkpointPath; - private final BigArrays bigArrays; // the last checkpoint that was written when the translog was last synced private volatile Checkpoint lastSyncedCheckpoint; /* the number of translog operations written to this file */ @@ -107,7 +105,6 @@ private TranslogWriter( TranslogHeader header, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, - BigArrays bigArrays, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener, TranslogOperationAsserter operationAsserter, @@ -134,7 +131,6 @@ private TranslogWriter( assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer; - this.bigArrays = bigArrays; this.diskIoBufferPool = diskIoBufferPool; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.tragedy = tragedy; @@ -158,7 +154,6 @@ public static TranslogWriter create( long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer, - BigArrays bigArrays, DiskIoBufferPool diskIoBufferPool, OperationListener operationListener, TranslogOperationAsserter operationAsserter, @@ -203,7 +198,6 @@ public static TranslogWriter create( header, tragedy, persistedSequenceNumberConsumer, - bigArrays, diskIoBufferPool, operationListener, operationAsserter, @@ -235,7 +229,7 @@ private synchronized void closeWithTragicEvent(final Exception ex) { * @return the location the bytes were written to * @throws IOException if writing to the translog resulted in an I/O exception */ - public Translog.Location add(final BytesReference data, final long seqNo) throws IOException { + public Translog.Location add(final ReleasableBytesStreamOutput data, final long seqNo) throws IOException { long bufferedBytesBeforeAdd = this.bufferedBytes; if (bufferedBytesBeforeAdd >= forceWriteThreshold) { writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4); @@ -244,13 +238,18 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws final Translog.Location location; synchronized (this) { ensureOpen(); + int len = data.size(); + final BytesReference bytes; if (buffer == null) { - buffer = new ReleasableBytesStreamOutput(bigArrays); + assert bufferedBytes == 0; + buffer = data; + bytes = data.bytes(); + } else { + assert bufferedBytes == buffer.size(); + data.bytes().writeTo(buffer); + data.close(); + bytes = buffer.bytes().slice((int) bufferedBytes, len); } - assert bufferedBytes == buffer.size(); - final long offset = totalOffset; - totalOffset += data.length(); - data.writeTo(buffer); assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; @@ -262,10 +261,12 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws operationCounter++; - assert assertNoSeqNumberConflict(seqNo, data); + assert assertNoSeqNumberConflict(seqNo, bytes); - location = new Translog.Location(generation, offset, data.length()); - operationListener.operationAdded(data, seqNo, location); + final long offset = totalOffset; + totalOffset = offset + len; + location = new Translog.Location(generation, offset, len); + operationListener.operationAdded(bytes, seqNo, location); bufferedBytes = buffer.size(); } @@ -554,7 +555,7 @@ private synchronized ReleasableBytesReference pollOpsToWrite() { } private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException { - try (ReleasableBytesReference toClose = toWrite) { + try { assert writeLock.isHeldByCurrentThread(); final int length = toWrite.length(); if (length == 0) { @@ -585,16 +586,22 @@ private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOExcep } } } + toWrite.close(); + toWrite = null; ioBuffer.flip(); writeToFile(ioBuffer); + } finally { + if (toWrite != null) { + toWrite.close(); + } } } @SuppressForbidden(reason = "Channel#write") private void writeToFile(ByteBuffer ioBuffer) throws IOException { - while (ioBuffer.remaining() > 0) { + do { channel.write(ioBuffer); - } + } while (ioBuffer.hasRemaining()); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 9aa847c837e96..39282755f3fe5 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -11,9 +11,6 @@ import org.apache.lucene.store.ByteArrayDataOutput; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; @@ -29,6 +26,7 @@ import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.index.translog.TranslogTests.wrapAsReleasableOutput; import static org.hamcrest.Matchers.equalTo; public class TranslogDeletionPolicyTests extends ESTestCase { @@ -93,7 +91,6 @@ private Tuple, TranslogWriter> createReadersAndWriter() thr randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, - BigArrays.NON_RECYCLING_INSTANCE, TranslogTests.RANDOMIZING_IO_BUFFERS, TranslogConfig.NOOP_OPERATION_LISTENER, TranslogOperationAsserter.DEFAULT, @@ -106,7 +103,7 @@ private Tuple, TranslogWriter> createReadersAndWriter() thr for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { out.reset(bytes); out.writeInt(ops); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops); + writer.add(wrapAsReleasableOutput(bytes), ops); } } return new Tuple<>(readers, writer); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 69cf9d856ed4f..b81805fcfb220 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -34,10 +34,10 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -1339,7 +1339,7 @@ public void testTranslogWriter() throws IOException { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { seenSeqNos.add(seqNo); } - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo); + writer.add(wrapAsReleasableOutput(bytes), seqNo); } assertThat(persistedSeqNos, empty()); writer.sync(); @@ -1364,7 +1364,7 @@ public void testTranslogWriter() throws IOException { byte[] bytes = new byte[4]; DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); out.writeInt(2048); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(wrapAsReleasableOutput(bytes), randomNonNegativeLong()); if (reader instanceof TranslogReader) { ByteBuffer buffer = ByteBuffer.allocate(4); @@ -1387,6 +1387,16 @@ public void testTranslogWriter() throws IOException { IOUtils.close(writer); } + static ReleasableBytesStreamOutput wrapAsReleasableOutput(byte[] bytes) { + var out = new ReleasableBytesStreamOutput(NON_RECYCLING_INSTANCE); + try { + out.write(bytes); + } catch (IOException e) { + throw new AssertionError(e); + } + return out; + } + public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { Path tempDir = createTempDir(); final TranslogConfig temp = getTranslogConfig(tempDir); @@ -1460,16 +1470,15 @@ ChannelFactory getChannelFactory() { TranslogWriter writer = translog.getCurrent(); int initialWriteCalls = writeCalls.get(); byte[] bytes = new byte[256]; - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); + writer.add(wrapAsReleasableOutput(bytes), 1); + writer.add(wrapAsReleasableOutput(bytes), 2); + writer.add(wrapAsReleasableOutput(bytes), 3); + writer.add(wrapAsReleasableOutput(bytes), 4); assertThat(persistedSeqNos, empty()); - assertEquals(initialWriteCalls, writeCalls.get()); if (randomBoolean()) { // Since the buffer is full, this will flush before performing the add. - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + writer.add(wrapAsReleasableOutput(bytes), 5); assertThat(persistedSeqNos, empty()); assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); } else { @@ -1479,7 +1488,7 @@ ChannelFactory getChannelFactory() { assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); // Add after we the read flushed the buffer - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + writer.add(wrapAsReleasableOutput(bytes), 5); } writer.sync(); @@ -1578,7 +1587,7 @@ ChannelFactory getChannelFactory() { byte[] bytes = new byte[4]; DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(new byte[4])); out.writeInt(1); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); + writer.add(wrapAsReleasableOutput(bytes), 1); assertThat(persistedSeqNos, empty()); startBlocking.set(true); Thread thread = new Thread(() -> { @@ -1592,7 +1601,7 @@ ChannelFactory getChannelFactory() { writeStarted.await(); // Add will not block even though we are currently writing/syncing - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); + writer.add(wrapAsReleasableOutput(bytes), 2); blocker.countDown(); // Sync against so that both operations are written @@ -1693,7 +1702,7 @@ public void testCloseIntoReader() throws IOException { final byte[] bytes = new byte[4]; final DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); out.writeInt(i); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(wrapAsReleasableOutput(bytes), randomNonNegativeLong()); } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint(); From caa96ed35aaceef15cf243dcf49e3b050f77b277 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 7 May 2025 19:33:41 +0200 Subject: [PATCH 2/3] bug --- .../search/SearchApplicationIndexService.java | 26 +++--- ...rTimeDoubleGroupingAggregatorFunction.java | 81 +++++++++++++++---- ...erTimeFloatGroupingAggregatorFunction.java | 81 +++++++++++++++---- ...OverTimeIntGroupingAggregatorFunction.java | 80 ++++++++++++++---- ...verTimeLongGroupingAggregatorFunction.java | 81 +++++++++++++++---- 5 files changed, 274 insertions(+), 75 deletions(-) diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java index d0a687d8084cb..c403fa303a8af 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java @@ -291,19 +291,21 @@ private IndicesAliasesRequestBuilder updateAliasIndices(Set currentAlias } private void updateSearchApplication(SearchApplication app, boolean create, ActionListener listener) { - try (ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking())) { - try (XContentBuilder source = XContentFactory.jsonBuilder(buffer)) { - source.startObject() - .field(SearchApplication.NAME_FIELD.getPreferredName(), app.name()) - .field(SearchApplication.ANALYTICS_COLLECTION_NAME_FIELD.getPreferredName(), app.analyticsCollectionName()) - .field(SearchApplication.UPDATED_AT_MILLIS_FIELD.getPreferredName(), app.updatedAtMillis()) - .directFieldAsBase64( - SearchApplication.BINARY_CONTENT_FIELD.getPreferredName(), - os -> writeSearchApplicationBinaryWithVersion(app, os, clusterService.state().getMinTransportVersion()) - ) - .endObject(); - } + try ( + ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking()); + XContentBuilder source = XContentFactory.jsonBuilder(buffer) + ) { + source.startObject() + .field(SearchApplication.NAME_FIELD.getPreferredName(), app.name()) + .field(SearchApplication.ANALYTICS_COLLECTION_NAME_FIELD.getPreferredName(), app.analyticsCollectionName()) + .field(SearchApplication.UPDATED_AT_MILLIS_FIELD.getPreferredName(), app.updatedAtMillis()) + .directFieldAsBase64( + SearchApplication.BINARY_CONTENT_FIELD.getPreferredName(), + os -> writeSearchApplicationBinaryWithVersion(app, os, clusterService.state().getMinTransportVersion()) + ) + .endObject(); DocWriteRequest.OpType opType = (create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX); + source.flush(); final IndexRequest indexRequest = new IndexRequest(SEARCH_APPLICATION_ALIAS_NAME).opType(DocWriteRequest.OpType.INDEX) .id(app.name()) .opType(opType) diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java index b9ee302f45b24..c0e299d57f6bb 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleGroupingAggregatorFunction.java @@ -13,7 +13,8 @@ import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.DoubleVector; import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; @@ -72,7 +73,12 @@ public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenG } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); } @@ -88,7 +94,12 @@ public void close() { } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); } @@ -103,31 +114,45 @@ public void close() { }; } - private void addRawInput(int positionOffset, IntVector groups, DoubleBlock values, + private void addRawInput(int positionOffset, IntArrayBlock groups, DoubleBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - if (values.isNull(groupPosition + positionOffset)) { + if (groups.isNull(groupPosition)) { continue; } - int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); - int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); - for (int v = valuesStart; v < valuesEnd; v++) { - FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } } } } - private void addRawInput(int positionOffset, IntVector groups, DoubleVector values, + private void addRawInput(int positionOffset, IntArrayBlock groups, DoubleVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - var valuePosition = groupPosition + positionOffset; - FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } } } - private void addRawInput(int positionOffset, IntBlock groups, DoubleBlock values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, DoubleBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -149,7 +174,7 @@ private void addRawInput(int positionOffset, IntBlock groups, DoubleBlock values } } - private void addRawInput(int positionOffset, IntBlock groups, DoubleVector values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, DoubleVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -165,6 +190,30 @@ private void addRawInput(int positionOffset, IntBlock groups, DoubleVector value } } + private void addRawInput(int positionOffset, IntVector groups, DoubleBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, DoubleVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition)); + } + } + @Override public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { state.enableGroupIdTracking(seenGroupIds); diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java index ad3f37cd22a00..df4b6c843ff75 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeFloatGroupingAggregatorFunction.java @@ -13,7 +13,8 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.FloatVector; -import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; @@ -72,7 +73,12 @@ public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenG } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); } @@ -88,7 +94,12 @@ public void close() { } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); } @@ -103,31 +114,45 @@ public void close() { }; } - private void addRawInput(int positionOffset, IntVector groups, FloatBlock values, + private void addRawInput(int positionOffset, IntArrayBlock groups, FloatBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - if (values.isNull(groupPosition + positionOffset)) { + if (groups.isNull(groupPosition)) { continue; } - int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); - int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); - for (int v = valuesStart; v < valuesEnd; v++) { - FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } } } } - private void addRawInput(int positionOffset, IntVector groups, FloatVector values, + private void addRawInput(int positionOffset, IntArrayBlock groups, FloatVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - var valuePosition = groupPosition + positionOffset; - FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } } } - private void addRawInput(int positionOffset, IntBlock groups, FloatBlock values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, FloatBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -149,7 +174,7 @@ private void addRawInput(int positionOffset, IntBlock groups, FloatBlock values, } } - private void addRawInput(int positionOffset, IntBlock groups, FloatVector values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, FloatVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -165,6 +190,30 @@ private void addRawInput(int positionOffset, IntBlock groups, FloatVector values } } + private void addRawInput(int positionOffset, IntVector groups, FloatBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, FloatVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition)); + } + } + @Override public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { state.enableGroupIdTracking(seenGroupIds); diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java index 9253aa51831b2..d0252f8b420d0 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeIntGroupingAggregatorFunction.java @@ -11,6 +11,8 @@ import java.util.List; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; @@ -70,7 +72,12 @@ public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenG } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); } @@ -86,7 +93,12 @@ public void close() { } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); } @@ -101,31 +113,45 @@ public void close() { }; } - private void addRawInput(int positionOffset, IntVector groups, IntBlock values, + private void addRawInput(int positionOffset, IntArrayBlock groups, IntBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - if (values.isNull(groupPosition + positionOffset)) { + if (groups.isNull(groupPosition)) { continue; } - int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); - int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); - for (int v = valuesStart; v < valuesEnd; v++) { - FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } } } } - private void addRawInput(int positionOffset, IntVector groups, IntVector values, + private void addRawInput(int positionOffset, IntArrayBlock groups, IntVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - var valuePosition = groupPosition + positionOffset; - FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } } } - private void addRawInput(int positionOffset, IntBlock groups, IntBlock values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, IntBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -147,7 +173,7 @@ private void addRawInput(int positionOffset, IntBlock groups, IntBlock values, } } - private void addRawInput(int positionOffset, IntBlock groups, IntVector values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, IntVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -163,6 +189,30 @@ private void addRawInput(int positionOffset, IntBlock groups, IntVector values, } } + private void addRawInput(int positionOffset, IntVector groups, IntBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, IntVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition)); + } + } + @Override public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { state.enableGroupIdTracking(seenGroupIds); diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java index e5a372c767b73..8506d1e8d527b 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstOverTimeLongGroupingAggregatorFunction.java @@ -11,7 +11,8 @@ import java.util.List; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; @@ -70,7 +71,12 @@ public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenG } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector); } @@ -86,7 +92,12 @@ public void close() { } return new GroupingAggregatorFunction.AddInput() { @Override - public void add(int positionOffset, IntBlock groupIds) { + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { addRawInput(positionOffset, groupIds, valuesVector, timestampsVector); } @@ -101,31 +112,45 @@ public void close() { }; } - private void addRawInput(int positionOffset, IntVector groups, LongBlock values, + private void addRawInput(int positionOffset, IntArrayBlock groups, LongBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - if (values.isNull(groupPosition + positionOffset)) { + if (groups.isNull(groupPosition)) { continue; } - int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); - int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); - for (int v = valuesStart; v < valuesEnd; v++) { - FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } } } } - private void addRawInput(int positionOffset, IntVector groups, LongVector values, + private void addRawInput(int positionOffset, IntArrayBlock groups, LongVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - int groupId = groups.getInt(groupPosition); - var valuePosition = groupPosition + positionOffset; - FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } } } - private void addRawInput(int positionOffset, IntBlock groups, LongBlock values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, LongBlock values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -147,7 +172,7 @@ private void addRawInput(int positionOffset, IntBlock groups, LongBlock values, } } - private void addRawInput(int positionOffset, IntBlock groups, LongVector values, + private void addRawInput(int positionOffset, IntBigArrayBlock groups, LongVector values, LongVector timestamps) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { @@ -163,6 +188,30 @@ private void addRawInput(int positionOffset, IntBlock groups, LongVector values, } } + private void addRawInput(int positionOffset, IntVector groups, LongBlock values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + if (values.isNull(groupPosition + positionOffset)) { + continue; + } + int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset); + int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset); + for (int v = valuesStart; v < valuesEnd; v++) { + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, LongVector values, + LongVector timestamps) { + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + var valuePosition = groupPosition + positionOffset; + FirstOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition)); + } + } + @Override public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { state.enableGroupIdTracking(seenGroupIds); From d34a6c60ebcdc153de25e1b1aaa809f1298dd704 Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 8 May 2025 01:22:14 +0200 Subject: [PATCH 3/3] bug --- .../stream/ReleasableBytesStreamOutput.java | 7 ++--- .../search/SearchApplicationIndexService.java | 26 +++++++++---------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java index e0af4cbd02f8d..f9f66c9b3eb9f 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; /** * An bytes stream output that allows providing a {@link BigArrays} instance @@ -35,11 +36,7 @@ public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { @Override public void close() { - var bytes = this.bytes; - if (bytes != null) { - this.bytes = null; - bytes.close(); - } + Releasables.close(bytes); } @Override diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java index c403fa303a8af..d0a687d8084cb 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/search/SearchApplicationIndexService.java @@ -291,21 +291,19 @@ private IndicesAliasesRequestBuilder updateAliasIndices(Set currentAlias } private void updateSearchApplication(SearchApplication app, boolean create, ActionListener listener) { - try ( - ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking()); - XContentBuilder source = XContentFactory.jsonBuilder(buffer) - ) { - source.startObject() - .field(SearchApplication.NAME_FIELD.getPreferredName(), app.name()) - .field(SearchApplication.ANALYTICS_COLLECTION_NAME_FIELD.getPreferredName(), app.analyticsCollectionName()) - .field(SearchApplication.UPDATED_AT_MILLIS_FIELD.getPreferredName(), app.updatedAtMillis()) - .directFieldAsBase64( - SearchApplication.BINARY_CONTENT_FIELD.getPreferredName(), - os -> writeSearchApplicationBinaryWithVersion(app, os, clusterService.state().getMinTransportVersion()) - ) - .endObject(); + try (ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking())) { + try (XContentBuilder source = XContentFactory.jsonBuilder(buffer)) { + source.startObject() + .field(SearchApplication.NAME_FIELD.getPreferredName(), app.name()) + .field(SearchApplication.ANALYTICS_COLLECTION_NAME_FIELD.getPreferredName(), app.analyticsCollectionName()) + .field(SearchApplication.UPDATED_AT_MILLIS_FIELD.getPreferredName(), app.updatedAtMillis()) + .directFieldAsBase64( + SearchApplication.BINARY_CONTENT_FIELD.getPreferredName(), + os -> writeSearchApplicationBinaryWithVersion(app, os, clusterService.state().getMinTransportVersion()) + ) + .endObject(); + } DocWriteRequest.OpType opType = (create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX); - source.flush(); final IndexRequest indexRequest = new IndexRequest(SEARCH_APPLICATION_ALIAS_NAME).opType(DocWriteRequest.OpType.INDEX) .id(app.name()) .opType(opType)