Skip to content

Commit 02cda31

Browse files
committed
Clarify flush result when skipped due to collision
The unclear boolean flag before was easy to misinterpret. This PR clarifies the boolean flag better, to understand that it details to whether a flush request was skipped due to collisions or not (rather than the flush request being performed or not).
1 parent 9bab4d1 commit 02cda31

File tree

7 files changed

+34
-29
lines changed

7 files changed

+34
-29
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.lucene.util.SetOnce;
3737
import org.elasticsearch.ExceptionsHelper;
3838
import org.elasticsearch.action.ActionListener;
39+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
3940
import org.elasticsearch.action.index.IndexRequest;
4041
import org.elasticsearch.action.support.PlainActionFuture;
4142
import org.elasticsearch.action.support.SubscribableListener;
@@ -2490,10 +2491,21 @@ public RefreshResult(boolean refreshed) {
24902491
}
24912492
}
24922493

2493-
public record FlushResult(boolean flushPerformed, long generation) {
2494+
/**
2495+
* The result of a {@link FlushRequest}.
2496+
*
2497+
* @param skippedDueToCollision signifies whether the flush request was skipped due to a collision detected. Specifically it is
2498+
* <code>true</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, which means
2499+
* the flush request was skipped, else it is <code>false</code>, which means there was no collision and
2500+
* the flush request was processed (even if in the end it did not actually flush anything).
2501+
* @param generation the generation of the index commit of the flush. may be {@link FlushResult#UNKNOWN_GENERATION} if
2502+
* unknown, e.g., if the flush was skipped or not performed ultimately.
2503+
*/
2504+
public record FlushResult(boolean skippedDueToCollision, long generation) {
24942505

24952506
public static final long UNKNOWN_GENERATION = -1L;
2496-
public static final FlushResult NO_FLUSH = new FlushResult(false, UNKNOWN_GENERATION);
2507+
public static final FlushResult FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION = new FlushResult(true, UNKNOWN_GENERATION);
2508+
public static final FlushResult FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED = new FlushResult(false, UNKNOWN_GENERATION);
24972509
}
24982510

24992511
/**

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2260,7 +2260,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
22602260
// if we can't get the lock right away we block if needed otherwise barf
22612261
if (waitIfOngoing == false) {
22622262
logger.trace("detected an in-flight flush, not blocking to wait for it's completion");
2263-
listener.onResponse(FlushResult.NO_FLUSH);
2263+
listener.onResponse(FlushResult.FLUSH_REQUEST_SKIPPED_DUE_TO_COLLISION);
22642264
return;
22652265
}
22662266
logger.trace("waiting for in-flight flush to finish");
@@ -2347,7 +2347,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
23472347
pruneDeletedTombstones();
23482348
}
23492349

2350-
waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation)));
2350+
waitForCommitDurability(generation, listener.map(v -> new FlushResult(false, generation)));
23512351
}
23522352

23532353
protected final boolean isFlushLockIsHeldByCurrentThread() {

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ public boolean shouldPeriodicallyFlush() {
452452

453453
@Override
454454
protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
455-
listener.onResponse(new FlushResult(true, lastCommittedSegmentInfos.getGeneration()));
455+
listener.onResponse(new FlushResult(false, lastCommittedSegmentInfos.getGeneration()));
456456
}
457457

458458
@Override

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,11 +1503,10 @@ public BulkStats bulkStats() {
15031503
* Executes the given flush request against the engine.
15041504
*
15051505
* @param request the flush request
1506-
* @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.
1507-
* If <code>false</code> is returned, no flush happened.
1506+
* @return the flush result
15081507
*/
1509-
public boolean flush(FlushRequest request) {
1510-
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
1508+
public Engine.FlushResult flush(FlushRequest request) {
1509+
PlainActionFuture<Engine.FlushResult> future = new PlainActionFuture<>();
15111510
flush(request, future);
15121511
return future.actionGet();
15131512
}
@@ -1516,12 +1515,9 @@ public boolean flush(FlushRequest request) {
15161515
* Executes the given flush request against the engine.
15171516
*
15181517
* @param request the flush request
1519-
* @param listener to notify after full durability has been achieved.
1520-
* <code>false</code> if <code>waitIfOngoing==false</code>
1521-
* and an ongoing request is detected, else <code>true</code>.
1522-
* If <code>false</code> is returned, no flush happened.
1518+
* @param listener to notify after full durability has been achieved
15231519
*/
1524-
public void flush(FlushRequest request, ActionListener<Boolean> listener) {
1520+
public void flush(FlushRequest request, ActionListener<Engine.FlushResult> listener) {
15251521
final boolean waitIfOngoing = request.waitIfOngoing();
15261522
final boolean force = request.force();
15271523
logger.trace("flush with {}", request);
@@ -1533,11 +1529,7 @@ public void flush(FlushRequest request, ActionListener<Boolean> listener) {
15331529
*/
15341530
verifyNotClosed();
15351531
final long startTime = System.nanoTime();
1536-
getEngine().flush(
1537-
force,
1538-
waitIfOngoing,
1539-
ActionListener.runBefore(l.map(Engine.FlushResult::flushPerformed), () -> flushMetric.inc(System.nanoTime() - startTime))
1540-
);
1532+
getEngine().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime)));
15411533
});
15421534
}
15431535

@@ -2541,8 +2533,8 @@ public void flushOnIdle(long inactiveTimeNS) {
25412533
threadPool.executor(ThreadPool.Names.FLUSH)
25422534
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
25432535
@Override
2544-
public void onResponse(Boolean flushed) {
2545-
if (flushed == false) {
2536+
public void onResponse(Engine.FlushResult flushResult) {
2537+
if (flushResult.skippedDueToCollision()) {
25462538
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
25472539
// will retry (#87888)
25482540
active.set(true);
@@ -4151,7 +4143,7 @@ public void afterWriteOperation() {
41514143
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
41524144
flush(new FlushRequest(), new ActionListener<>() {
41534145
@Override
4154-
public void onResponse(Boolean flushed) {
4146+
public void onResponse(Engine.FlushResult flushResult) {
41554147
periodicFlushMetric.inc();
41564148
}
41574149

server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.core.TimeValue;
3737
import org.elasticsearch.index.IndexNotFoundException;
3838
import org.elasticsearch.index.IndexVersion;
39+
import org.elasticsearch.index.engine.Engine;
3940
import org.elasticsearch.index.shard.IndexShard;
4041
import org.elasticsearch.index.shard.ReplicationGroup;
4142
import org.elasticsearch.index.shard.ShardId;
@@ -190,7 +191,7 @@ private void executeOnPrimaryOrReplica(IndexShard indexShard, boolean phase1) th
190191

191192
public void testShardIsFlushed() throws Throwable {
192193
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
193-
doReturn(true).when(indexShard).flush(flushRequest.capture());
194+
doReturn(Engine.FlushResult.FLUSH_REQUEST_PROCESSED_AND_NOT_PERFORMED).when(indexShard).flush(flushRequest.capture());
194195
executeOnPrimaryOrReplica();
195196
verify(indexShard, times(1)).flush(any(FlushRequest.class));
196197
assertThat(flushRequest.getValue().force(), is(true));

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4342,8 +4342,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
43424342
shard.flushOnIdle(0);
43434343
mockLog.awaitAllExpectationsMatched();
43444344

4345-
// A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately
4346-
assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)));
4345+
// A direct call to flush (with waitIfOngoing=false) should not wait and return immediately
4346+
assertTrue(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)).skippedDueToCollision());
43474347

43484348
// Allow first flushOnIdle to complete
43494349
readyToCompleteFlushLatch.countDown();
@@ -4362,8 +4362,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
43624362
// The second flushOnIdle (that did not happen) should have turned the active flag to true
43634363
assertTrue(shard.isActive());
43644364

4365-
// After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true
4366-
assertTrue(shard.flush(new FlushRequest()));
4365+
// After all the previous flushes are done, issue a final flush (for any remaining documents) that should be processed
4366+
assertFalse(shard.flush(new FlushRequest()).skippedDueToCollision());
43674367

43684368
closeShards(shard);
43694369
}
@@ -4463,7 +4463,7 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
44634463
assertThat(pendingListeners.size(), is(numberOfFlushes));
44644464
assertThat(shard.flushStats().getPeriodic(), is(equalTo(0L)));
44654465

4466-
pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(true, 1)));
4466+
pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(false, 1)));
44674467
assertThat(shard.flushStats().getPeriodic(), is(equalTo((long) numberOfFlushes)));
44684468

44694469
closeShards(shard);

server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ public void testGetFromTranslog() throws IOException {
344344
PlainActionFuture<Engine.FlushResult> flushFuture = new PlainActionFuture<>();
345345
engine.flush(true, true, flushFuture);
346346
var flushResult = flushFuture.actionGet();
347-
assertTrue(flushResult.flushPerformed());
347+
assertFalse(flushResult.skippedDueToCollision());
348348
assertThat(flushResult.generation(), equalTo(lastUnsafeGeneration + 1));
349349
assertThat(engine.getLastUnsafeSegmentGenerationForGets(), equalTo(lastUnsafeGeneration));
350350
// No longer in translog

0 commit comments

Comments
 (0)