Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
Expand Down Expand Up @@ -2539,10 +2540,21 @@ public RefreshResult(boolean refreshed) {
}
}

public record FlushResult(boolean flushPerformed, long generation) {
/**
* The result of a {@link FlushRequest}.
*
* @param skippedDueToCollision signifies whether the flush request was skipped due to a collision detected. Specifically it is
* <code>true</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, which means
* the flush request was skipped, else it is <code>false</code>, which means there was no collision and
* the flush request was processed (even if in the end it did not actually flush anything).
* @param generation the generation of the index commit of the flush. may be {@link FlushResult#UNKNOWN_GENERATION} if
* unknown, e.g., if the flush was skipped or not performed ultimately.
*/
public record FlushResult(boolean skippedDueToCollision, long generation) {

public static final long UNKNOWN_GENERATION = -1L;
public static final FlushResult NO_FLUSH = new FlushResult(false, UNKNOWN_GENERATION);
public static final FlushResult FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION = new FlushResult(true, UNKNOWN_GENERATION);
public static final FlushResult FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED = new FlushResult(false, UNKNOWN_GENERATION);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2260,7 +2260,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing == false) {
logger.trace("detected an in-flight flush, not blocking to wait for it's completion");
listener.onResponse(FlushResult.NO_FLUSH);
listener.onResponse(FlushResult.FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION);
return;
}
logger.trace("waiting for in-flight flush to finish");
Expand Down Expand Up @@ -2347,7 +2347,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
pruneDeletedTombstones();
}

waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation)));
waitForCommitDurability(generation, listener.map(v -> new FlushResult(false, generation)));
}

protected final boolean isFlushLockIsHeldByCurrentThread() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public boolean shouldPeriodicallyFlush() {

@Override
protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
listener.onResponse(new FlushResult(true, lastCommittedSegmentInfos.getGeneration()));
listener.onResponse(new FlushResult(false, lastCommittedSegmentInfos.getGeneration()));
}

@Override
Expand Down
26 changes: 9 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1508,11 +1508,10 @@ public BulkStats bulkStats() {
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.
* If <code>false</code> is returned, no flush happened.
* @return the flush result
*/
public boolean flush(FlushRequest request) {
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
public Engine.FlushResult flush(FlushRequest request) {
PlainActionFuture<Engine.FlushResult> future = new PlainActionFuture<>();
flush(request, future);
return future.actionGet();
}
Expand All @@ -1521,12 +1520,9 @@ public boolean flush(FlushRequest request) {
* Executes the given flush request against the engine.
*
* @param request the flush request
* @param listener to notify after full durability has been achieved.
* <code>false</code> if <code>waitIfOngoing==false</code>
* and an ongoing request is detected, else <code>true</code>.
* If <code>false</code> is returned, no flush happened.
* @param listener to notify after full durability has been achieved
*/
public void flush(FlushRequest request, ActionListener<Boolean> listener) {
public void flush(FlushRequest request, ActionListener<Engine.FlushResult> listener) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
Expand All @@ -1538,11 +1534,7 @@ public void flush(FlushRequest request, ActionListener<Boolean> listener) {
*/
verifyNotClosed();
final long startTime = System.nanoTime();
getEngine().flush(
force,
waitIfOngoing,
ActionListener.runBefore(l.map(Engine.FlushResult::flushPerformed), () -> flushMetric.inc(System.nanoTime() - startTime))
);
getEngine().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime)));
});
}

Expand Down Expand Up @@ -2546,8 +2538,8 @@ public void flushOnIdle(long inactiveTimeNS) {
threadPool.executor(ThreadPool.Names.FLUSH)
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
@Override
public void onResponse(Boolean flushed) {
if (flushed == false) {
public void onResponse(Engine.FlushResult flushResult) {
if (flushResult.skippedDueToCollision()) {
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
// will retry (#87888)
active.set(true);
Expand Down Expand Up @@ -4205,7 +4197,7 @@ public void afterWriteOperation() {
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
flush(new FlushRequest(), new ActionListener<>() {
@Override
public void onResponse(Boolean flushed) {
public void onResponse(Engine.FlushResult flushResult) {
periodicFlushMetric.inc();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -190,7 +191,7 @@ private void executeOnPrimaryOrReplica(IndexShard indexShard, boolean phase1) th

public void testShardIsFlushed() throws Throwable {
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
doReturn(true).when(indexShard).flush(flushRequest.capture());
doReturn(Engine.FlushResult.FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED).when(indexShard).flush(flushRequest.capture());
executeOnPrimaryOrReplica();
verify(indexShard, times(1)).flush(any(FlushRequest.class));
assertThat(flushRequest.getValue().force(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4390,8 +4390,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
shard.flushOnIdle(0);
mockLog.awaitAllExpectationsMatched();

// A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately
assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)));
// A direct call to flush (with waitIfOngoing=false) should not wait and return immediately
assertTrue(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)).skippedDueToCollision());

// Allow first flushOnIdle to complete
readyToCompleteFlushLatch.countDown();
Expand All @@ -4410,8 +4410,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
// The second flushOnIdle (that did not happen) should have turned the active flag to true
assertTrue(shard.isActive());

// After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true
assertTrue(shard.flush(new FlushRequest()));
// After all the previous flushes are done, issue a final flush (for any remaining documents) that should be processed
assertFalse(shard.flush(new FlushRequest()).skippedDueToCollision());

closeShards(shard);
}
Expand Down Expand Up @@ -4511,7 +4511,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
assertThat(pendingListeners.size(), is(numberOfFlushes));
assertThat(shard.flushStats().getPeriodic(), is(equalTo(0L)));

pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(true, 1)));
pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(false, 1)));
assertThat(shard.flushStats().getPeriodic(), is(equalTo((long) numberOfFlushes)));

closeShards(shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void testGetFromTranslog() throws IOException {
PlainActionFuture<Engine.FlushResult> flushFuture = new PlainActionFuture<>();
engine.flush(true, true, flushFuture);
var flushResult = flushFuture.actionGet();
assertTrue(flushResult.flushPerformed());
assertFalse(flushResult.skippedDueToCollision());
assertThat(flushResult.generation(), equalTo(lastUnsafeGeneration + 1));
assertThat(engine.getLastUnsafeSegmentGenerationForGets(), equalTo(lastUnsafeGeneration));
// No longer in translog
Expand Down