From 02cda314c0d8bbb889822d6f2d415ff64b4281cd Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Wed, 30 Jul 2025 12:04:40 +0300 Subject: [PATCH] Clarify flush result when skipped due to collision The unclear boolean flag before was easy to misinterpret. This PR clarifies the boolean flag better, to understand that it details to whether a flush request was skipped due to collisions or not (rather than the flush request being performed or not). --- .../elasticsearch/index/engine/Engine.java | 16 ++++++++++-- .../index/engine/InternalEngine.java | 4 +-- .../index/engine/ReadOnlyEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 26 +++++++------------ ...portVerifyShardBeforeCloseActionTests.java | 3 ++- .../index/shard/IndexShardTests.java | 10 +++---- .../index/shard/ShardGetServiceTests.java | 2 +- 7 files changed, 34 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b6120e73a5de7..5031af8d6662f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -36,6 +36,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; @@ -2490,10 +2491,21 @@ public RefreshResult(boolean refreshed) { } } - public record FlushResult(boolean flushPerformed, long generation) { + /** + * The result of a {@link FlushRequest}. + * + * @param skippedDueToCollision signifies whether the flush request was skipped due to a collision detected. Specifically it is + * true if waitIfOngoing==false and an ongoing request is detected, which means + * the flush request was skipped, else it is false, which means there was no collision and + * the flush request was processed (even if in the end it did not actually flush anything). + * @param generation the generation of the index commit of the flush. may be {@link FlushResult#UNKNOWN_GENERATION} if + * unknown, e.g., if the flush was skipped or not performed ultimately. + */ + public record FlushResult(boolean skippedDueToCollision, long generation) { public static final long UNKNOWN_GENERATION = -1L; - public static final FlushResult NO_FLUSH = new FlushResult(false, UNKNOWN_GENERATION); + public static final FlushResult FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION = new FlushResult(true, UNKNOWN_GENERATION); + public static final FlushResult FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED = new FlushResult(false, UNKNOWN_GENERATION); } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7753decfc4854..ede0b18ce99b7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2260,7 +2260,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList // if we can't get the lock right away we block if needed otherwise barf if (waitIfOngoing == false) { logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); - listener.onResponse(FlushResult.NO_FLUSH); + listener.onResponse(FlushResult.FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION); return; } logger.trace("waiting for in-flight flush to finish"); @@ -2347,7 +2347,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList pruneDeletedTombstones(); } - waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation))); + waitForCommitDurability(generation, listener.map(v -> new FlushResult(false, generation))); } protected final boolean isFlushLockIsHeldByCurrentThread() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 2388154494ad4..35a0457341eba 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -452,7 +452,7 @@ public boolean shouldPeriodicallyFlush() { @Override protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener listener) throws EngineException { - listener.onResponse(new FlushResult(true, lastCommittedSegmentInfos.getGeneration())); + listener.onResponse(new FlushResult(false, lastCommittedSegmentInfos.getGeneration())); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 59129f9911da2..b9e1d6fcf0a31 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1503,11 +1503,10 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request - * @return false if waitIfOngoing==false and an ongoing request is detected, else true. - * If false is returned, no flush happened. + * @return the flush result */ - public boolean flush(FlushRequest request) { - PlainActionFuture future = new PlainActionFuture<>(); + public Engine.FlushResult flush(FlushRequest request) { + PlainActionFuture future = new PlainActionFuture<>(); flush(request, future); return future.actionGet(); } @@ -1516,12 +1515,9 @@ public boolean flush(FlushRequest request) { * Executes the given flush request against the engine. * * @param request the flush request - * @param listener to notify after full durability has been achieved. - * false if waitIfOngoing==false - * and an ongoing request is detected, else true. - * If false is returned, no flush happened. + * @param listener to notify after full durability has been achieved */ - public void flush(FlushRequest request, ActionListener listener) { + public void flush(FlushRequest request, ActionListener listener) { final boolean waitIfOngoing = request.waitIfOngoing(); final boolean force = request.force(); logger.trace("flush with {}", request); @@ -1533,11 +1529,7 @@ public void flush(FlushRequest request, ActionListener listener) { */ verifyNotClosed(); final long startTime = System.nanoTime(); - getEngine().flush( - force, - waitIfOngoing, - ActionListener.runBefore(l.map(Engine.FlushResult::flushPerformed), () -> flushMetric.inc(System.nanoTime() - startTime)) - ); + getEngine().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime))); }); } @@ -2541,8 +2533,8 @@ public void flushOnIdle(long inactiveTimeNS) { threadPool.executor(ThreadPool.Names.FLUSH) .execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() { @Override - public void onResponse(Boolean flushed) { - if (flushed == false) { + public void onResponse(Engine.FlushResult flushResult) { + if (flushResult.skippedDueToCollision()) { // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request // will retry (#87888) active.set(true); @@ -4151,7 +4143,7 @@ public void afterWriteOperation() { threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> { flush(new FlushRequest(), new ActionListener<>() { @Override - public void onResponse(Boolean flushed) { + public void onResponse(Engine.FlushResult flushResult) { periodicFlushMetric.inc(); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index cb5d10ac3c5c0..0029a307d107e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -190,7 +191,7 @@ private void executeOnPrimaryOrReplica(IndexShard indexShard, boolean phase1) th public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - doReturn(true).when(indexShard).flush(flushRequest.capture()); + doReturn(Engine.FlushResult.FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED).when(indexShard).flush(flushRequest.capture()); executeOnPrimaryOrReplica(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); assertThat(flushRequest.getValue().force(), is(true)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d3330cfc17216..ee75ac2bfe4fe 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4342,8 +4342,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl shard.flushOnIdle(0); mockLog.awaitAllExpectationsMatched(); - // A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately - assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false))); + // A direct call to flush (with waitIfOngoing=false) should not wait and return immediately + assertTrue(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)).skippedDueToCollision()); // Allow first flushOnIdle to complete readyToCompleteFlushLatch.countDown(); @@ -4362,8 +4362,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl // The second flushOnIdle (that did not happen) should have turned the active flag to true assertTrue(shard.isActive()); - // After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true - assertTrue(shard.flush(new FlushRequest())); + // After all the previous flushes are done, issue a final flush (for any remaining documents) that should be processed + assertFalse(shard.flush(new FlushRequest()).skippedDueToCollision()); closeShards(shard); } @@ -4463,7 +4463,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList assertThat(pendingListeners.size(), is(numberOfFlushes)); assertThat(shard.flushStats().getPeriodic(), is(equalTo(0L))); - pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(true, 1))); + pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(false, 1))); assertThat(shard.flushStats().getPeriodic(), is(equalTo((long) numberOfFlushes))); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index f38b6ef660eff..41f70541dbf1b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -344,7 +344,7 @@ public void testGetFromTranslog() throws IOException { PlainActionFuture flushFuture = new PlainActionFuture<>(); engine.flush(true, true, flushFuture); var flushResult = flushFuture.actionGet(); - assertTrue(flushResult.flushPerformed()); + assertFalse(flushResult.skippedDueToCollision()); assertThat(flushResult.generation(), equalTo(lastUnsafeGeneration + 1)); assertThat(engine.getLastUnsafeSegmentGenerationForGets(), equalTo(lastUnsafeGeneration)); // No longer in translog