Skip to content

Commit b5ea934

Browse files
committed
flushOnIdle use tryWithEngineOrNull
Since it is an optimistic operation, we can let it be skipped if the shard is currently being reset. That way the IndexingMemoryController will not get stuck if a lot of shards are gradually having their engines being reset. Relates ES-12413
1 parent 5f7a2ed commit b5ea934

File tree

2 files changed

+87
-23
lines changed

2 files changed

+87
-23
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2545,33 +2545,37 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
25452545
* indexing operation, so we can flush the index.
25462546
*/
25472547
public void flushOnIdle(long inactiveTimeNS) {
2548-
Engine engineOrNull = getEngineOrNull();
2549-
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
2550-
boolean wasActive = active.getAndSet(false);
2551-
if (wasActive) {
2552-
logger.debug("flushing shard on inactive");
2553-
threadPool.executor(ThreadPool.Names.FLUSH)
2554-
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
2555-
@Override
2556-
public void onResponse(Engine.FlushResult flushResult) {
2557-
if (flushResult.skippedDueToCollision()) {
2558-
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
2559-
// will retry (#87888)
2560-
active.set(true);
2548+
tryWithEngineOrNull(engineOrNull -> {
2549+
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
2550+
boolean wasActive = active.getAndSet(false);
2551+
if (wasActive) {
2552+
logger.debug("flushing shard on inactive");
2553+
threadPool.executor(ThreadPool.Names.FLUSH)
2554+
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
2555+
@Override
2556+
public void onResponse(Engine.FlushResult flushResult) {
2557+
if (flushResult.skippedDueToCollision()) {
2558+
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
2559+
// will retry (#87888)
2560+
active.set(true);
2561+
}
2562+
periodicFlushMetric.inc();
25612563
}
2562-
periodicFlushMetric.inc();
2563-
}
25642564

2565-
@Override
2566-
public void onFailure(Exception e) {
2567-
if (state != IndexShardState.CLOSED) {
2568-
active.set(true);
2569-
logger.warn("failed to flush shard on inactive", e);
2565+
@Override
2566+
public void onFailure(Exception e) {
2567+
if (state != IndexShardState.CLOSED) {
2568+
active.set(true);
2569+
logger.warn("failed to flush shard on inactive", e);
2570+
}
25702571
}
2571-
}
2572-
}));
2572+
}));
2573+
}
2574+
} else {
2575+
logger.trace(() -> "flush on idle skipped as it is either closed, engine being reset, or has been recently written to");
25732576
}
2574-
}
2577+
return null;
2578+
});
25752579
}
25762580

25772581
public boolean isActive() {

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4497,6 +4497,66 @@ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
44974497
closeShards(shard);
44984498
}
44994499

4500+
@TestLogging(reason = "testing traces of concurrent flush and engine reset", value = "org.elasticsearch.index.shard.IndexShard:TRACE")
4501+
public void testFlushOnIdleDoesNotWaitWhileEngineIsReset() throws Exception {
4502+
final var preparedForReset = new AtomicBoolean();
4503+
final var shard = newStartedShard(true, Settings.EMPTY, config -> {
4504+
if (preparedForReset.get()) {
4505+
return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true);
4506+
} else {
4507+
return new InternalEngine(config) {
4508+
@Override
4509+
public void prepareForEngineReset() throws IOException {
4510+
assertTrue(preparedForReset.compareAndSet(false, true));
4511+
}
4512+
};
4513+
}
4514+
});
4515+
final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock();
4516+
4517+
final var release = new CountDownLatch(1);
4518+
final var reset = new PlainActionFuture<Void>();
4519+
final var resetEngineThread = new Thread(() -> {
4520+
try {
4521+
shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> {
4522+
try (permit) {
4523+
shard.resetEngine(newEngine -> {
4524+
assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true));
4525+
assertThat(newEngine, instanceOf(ReadOnlyEngine.class));
4526+
safeAwait(release);
4527+
});
4528+
assertThat(preparedForReset.get(), equalTo(true));
4529+
l.onResponse(null);
4530+
}
4531+
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
4532+
} catch (Exception e) {
4533+
reset.onFailure(e);
4534+
}
4535+
});
4536+
resetEngineThread.start();
4537+
4538+
assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true)));
4539+
4540+
try (var mockLog = MockLog.capture(IndexShard.class)) {
4541+
mockLog.addExpectation(
4542+
new MockLog.SeenEventExpectation(
4543+
"should see flush on idle returning since it will not wait for the engine reset",
4544+
IndexShard.class.getCanonicalName(),
4545+
Level.TRACE,
4546+
"flush on idle skipped"
4547+
)
4548+
);
4549+
shard.flushOnIdle(0);
4550+
mockLog.awaitAllExpectationsMatched();
4551+
}
4552+
4553+
release.countDown();
4554+
safeGet(reset);
4555+
assertThat(engineResetLock.isWriteLocked(), equalTo(false));
4556+
resetEngineThread.join();
4557+
closeShards(shard);
4558+
}
4559+
45004560
public void testMultiplePeriodicFlushesCanBeTriggeredBeforeTheyAreDurable() throws Exception {
45014561
List<ActionListener<Engine.FlushResult>> pendingListeners = Collections.synchronizedList(new ArrayList<>());
45024562
// Ensure that a single document forces a flush after each write

0 commit comments

Comments
 (0)