diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 636b18c06df66..88c36e2061f84 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -38,6 +38,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; @@ -2539,10 +2540,21 @@ public RefreshResult(boolean refreshed) { } } - public record FlushResult(boolean flushPerformed, long generation) { + /** + * The result of a {@link FlushRequest}. + * + * @param skippedDueToCollision signifies whether the flush request was skipped due to a collision detected. Specifically it is + * true if waitIfOngoing==false and an ongoing request is detected, which means + * the flush request was skipped, else it is false, which means there was no collision and + * the flush request was processed (even if in the end it did not actually flush anything). + * @param generation the generation of the index commit of the flush. may be {@link FlushResult#UNKNOWN_GENERATION} if + * unknown, e.g., if the flush was skipped or not performed ultimately. + */ + public record FlushResult(boolean skippedDueToCollision, long generation) { public static final long UNKNOWN_GENERATION = -1L; - public static final FlushResult NO_FLUSH = new FlushResult(false, UNKNOWN_GENERATION); + public static final FlushResult FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION = new FlushResult(true, UNKNOWN_GENERATION); + public static final FlushResult FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED = new FlushResult(false, UNKNOWN_GENERATION); } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1bed195862959..db4686bb4faf1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2260,7 +2260,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList // if we can't get the lock right away we block if needed otherwise barf if (waitIfOngoing == false) { logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); - listener.onResponse(FlushResult.NO_FLUSH); + listener.onResponse(FlushResult.FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION); return; } logger.trace("waiting for in-flight flush to finish"); @@ -2347,7 +2347,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList pruneDeletedTombstones(); } - waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation))); + waitForCommitDurability(generation, listener.map(v -> new FlushResult(false, generation))); } protected final boolean isFlushLockIsHeldByCurrentThread() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 800854dcedb0a..e5531b10c393c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -452,7 +452,7 @@ public boolean shouldPeriodicallyFlush() { @Override protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener listener) throws EngineException { - listener.onResponse(new FlushResult(true, lastCommittedSegmentInfos.getGeneration())); + listener.onResponse(new FlushResult(false, lastCommittedSegmentInfos.getGeneration())); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6641fd8cebe44..1e79f8a425896 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1508,11 +1508,10 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request - * @return false if waitIfOngoing==false and an ongoing request is detected, else true. - * If false is returned, no flush happened. + * @return the flush result */ - public boolean flush(FlushRequest request) { - PlainActionFuture future = new PlainActionFuture<>(); + public Engine.FlushResult flush(FlushRequest request) { + PlainActionFuture future = new PlainActionFuture<>(); flush(request, future); return future.actionGet(); } @@ -1521,12 +1520,9 @@ public boolean flush(FlushRequest request) { * Executes the given flush request against the engine. * * @param request the flush request - * @param listener to notify after full durability has been achieved. - * false if waitIfOngoing==false - * and an ongoing request is detected, else true. - * If false is returned, no flush happened. + * @param listener to notify after full durability has been achieved */ - public void flush(FlushRequest request, ActionListener listener) { + public void flush(FlushRequest request, ActionListener listener) { final boolean waitIfOngoing = request.waitIfOngoing(); final boolean force = request.force(); logger.trace("flush with {}", request); @@ -1538,11 +1534,7 @@ public void flush(FlushRequest request, ActionListener listener) { */ verifyNotClosed(); final long startTime = System.nanoTime(); - getEngine().flush( - force, - waitIfOngoing, - ActionListener.runBefore(l.map(Engine.FlushResult::flushPerformed), () -> flushMetric.inc(System.nanoTime() - startTime)) - ); + getEngine().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime))); }); } @@ -2546,8 +2538,8 @@ public void flushOnIdle(long inactiveTimeNS) { threadPool.executor(ThreadPool.Names.FLUSH) .execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() { @Override - public void onResponse(Boolean flushed) { - if (flushed == false) { + public void onResponse(Engine.FlushResult flushResult) { + if (flushResult.skippedDueToCollision()) { // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request // will retry (#87888) active.set(true); @@ -4205,7 +4197,7 @@ public void afterWriteOperation() { threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> { flush(new FlushRequest(), new ActionListener<>() { @Override - public void onResponse(Boolean flushed) { + public void onResponse(Engine.FlushResult flushResult) { periodicFlushMetric.inc(); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index cb5d10ac3c5c0..0029a307d107e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -190,7 +191,7 @@ private void executeOnPrimaryOrReplica(IndexShard indexShard, boolean phase1) th public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - doReturn(true).when(indexShard).flush(flushRequest.capture()); + doReturn(Engine.FlushResult.FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED).when(indexShard).flush(flushRequest.capture()); executeOnPrimaryOrReplica(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); assertThat(flushRequest.getValue().force(), is(true)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b40c7327c44d1..61fae7765d2a7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4390,8 +4390,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl shard.flushOnIdle(0); mockLog.awaitAllExpectationsMatched(); - // A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately - assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false))); + // A direct call to flush (with waitIfOngoing=false) should not wait and return immediately + assertTrue(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)).skippedDueToCollision()); // Allow first flushOnIdle to complete readyToCompleteFlushLatch.countDown(); @@ -4410,8 +4410,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl // The second flushOnIdle (that did not happen) should have turned the active flag to true assertTrue(shard.isActive()); - // After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true - assertTrue(shard.flush(new FlushRequest())); + // After all the previous flushes are done, issue a final flush (for any remaining documents) that should be processed + assertFalse(shard.flush(new FlushRequest()).skippedDueToCollision()); closeShards(shard); } @@ -4511,7 +4511,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList assertThat(pendingListeners.size(), is(numberOfFlushes)); assertThat(shard.flushStats().getPeriodic(), is(equalTo(0L))); - pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(true, 1))); + pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(false, 1))); assertThat(shard.flushStats().getPeriodic(), is(equalTo((long) numberOfFlushes))); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index f38b6ef660eff..41f70541dbf1b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -344,7 +344,7 @@ public void testGetFromTranslog() throws IOException { PlainActionFuture flushFuture = new PlainActionFuture<>(); engine.flush(true, true, flushFuture); var flushResult = flushFuture.actionGet(); - assertTrue(flushResult.flushPerformed()); + assertFalse(flushResult.skippedDueToCollision()); assertThat(flushResult.generation(), equalTo(lastUnsafeGeneration + 1)); assertThat(engine.getLastUnsafeSegmentGenerationForGets(), equalTo(lastUnsafeGeneration)); // No longer in translog