Skip to content

Commit 62fd0fb

Browse files
authored
flushOnIdle use tryWithEngineOrNull (#134157)
Since it is an optimistic operation, we can let it be skipped if the shard is currently being reset. That way the IndexingMemoryController will not get stuck if a lot of shards are gradually having their engines being reset. Relates ES-12413
1 parent b6f5141 commit 62fd0fb

File tree

2 files changed

+103
-24
lines changed

2 files changed

+103
-24
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,6 +1528,10 @@ public Engine.FlushResult flush(FlushRequest request) {
15281528
* @param listener to notify after full durability has been achieved
15291529
*/
15301530
public void flush(FlushRequest request, ActionListener<Engine.FlushResult> listener) {
1531+
flush(request, this::getEngine, listener);
1532+
}
1533+
1534+
protected void flush(FlushRequest request, Supplier<Engine> engine, ActionListener<Engine.FlushResult> listener) {
15311535
final boolean waitIfOngoing = request.waitIfOngoing();
15321536
final boolean force = request.force();
15331537
logger.trace("flush with {}", request);
@@ -1539,7 +1543,9 @@ public void flush(FlushRequest request, ActionListener<Engine.FlushResult> liste
15391543
*/
15401544
verifyNotClosed();
15411545
final long startTime = System.nanoTime();
1542-
getEngine().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime)));
1546+
assert engine != null : "engine supplier is null";
1547+
assert engine.get() != null : "engine is null";
1548+
engine.get().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime)));
15431549
});
15441550
}
15451551

@@ -2545,33 +2551,46 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
25452551
* indexing operation, so we can flush the index.
25462552
*/
25472553
public void flushOnIdle(long inactiveTimeNS) {
2548-
Engine engineOrNull = getEngineOrNull();
2549-
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
2550-
boolean wasActive = active.getAndSet(false);
2551-
if (wasActive) {
2552-
logger.debug("flushing shard on inactive");
2553-
threadPool.executor(ThreadPool.Names.FLUSH)
2554-
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
2555-
@Override
2556-
public void onResponse(Engine.FlushResult flushResult) {
2557-
if (flushResult.skippedDueToCollision()) {
2558-
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
2559-
// will retry (#87888)
2554+
tryWithEngineOrNull(engineOrNull -> {
2555+
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
2556+
boolean wasActive = active.getAndSet(false);
2557+
if (wasActive) {
2558+
logger.debug("flushing shard on inactive");
2559+
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
2560+
tryWithEngineOrNull(engine -> {
2561+
if (engine == null) {
2562+
// Revert active flag so that a next flushOnIdle request will retry
25602563
active.set(true);
2561-
}
2562-
periodicFlushMetric.inc();
2563-
}
2564+
} else {
2565+
flush(new FlushRequest().waitIfOngoing(false).force(false), () -> engine, new ActionListener<>() {
2566+
@Override
2567+
public void onResponse(Engine.FlushResult flushResult) {
2568+
if (flushResult.skippedDueToCollision()) {
2569+
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
2570+
// will retry (#87888)
2571+
active.set(true);
2572+
}
2573+
periodicFlushMetric.inc();
2574+
}
25642575

2565-
@Override
2566-
public void onFailure(Exception e) {
2567-
if (state != IndexShardState.CLOSED) {
2568-
active.set(true);
2569-
logger.warn("failed to flush shard on inactive", e);
2576+
@Override
2577+
public void onFailure(Exception e) {
2578+
if (state != IndexShardState.CLOSED) {
2579+
active.set(true);
2580+
logger.warn("failed to flush shard on inactive", e);
2581+
}
2582+
}
2583+
});
25702584
}
2571-
}
2572-
}));
2585+
return null;
2586+
});
2587+
});
2588+
}
2589+
} else {
2590+
logger.trace(() -> "flush on idle skipped as it is either closed, engine being reset, or has been recently written to");
25732591
}
2574-
}
2592+
return null;
2593+
});
25752594
}
25762595

25772596
public boolean isActive() {

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4497,6 +4497,66 @@ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
44974497
closeShards(shard);
44984498
}
44994499

4500+
@TestLogging(reason = "testing traces of concurrent flush and engine reset", value = "org.elasticsearch.index.shard.IndexShard:TRACE")
4501+
public void testFlushOnIdleDoesNotWaitWhileEngineIsReset() throws Exception {
4502+
final var preparedForReset = new AtomicBoolean();
4503+
final var shard = newStartedShard(true, Settings.EMPTY, config -> {
4504+
if (preparedForReset.get()) {
4505+
return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true);
4506+
} else {
4507+
return new InternalEngine(config) {
4508+
@Override
4509+
public void prepareForEngineReset() throws IOException {
4510+
assertTrue(preparedForReset.compareAndSet(false, true));
4511+
}
4512+
};
4513+
}
4514+
});
4515+
final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock();
4516+
4517+
final var release = new CountDownLatch(1);
4518+
final var reset = new PlainActionFuture<Void>();
4519+
final var resetEngineThread = new Thread(() -> {
4520+
try {
4521+
shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> {
4522+
try (permit) {
4523+
shard.resetEngine(newEngine -> {
4524+
assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true));
4525+
assertThat(newEngine, instanceOf(ReadOnlyEngine.class));
4526+
safeAwait(release);
4527+
});
4528+
assertThat(preparedForReset.get(), equalTo(true));
4529+
l.onResponse(null);
4530+
}
4531+
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
4532+
} catch (Exception e) {
4533+
reset.onFailure(e);
4534+
}
4535+
});
4536+
resetEngineThread.start();
4537+
4538+
assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true)));
4539+
4540+
try (var mockLog = MockLog.capture(IndexShard.class)) {
4541+
mockLog.addExpectation(
4542+
new MockLog.SeenEventExpectation(
4543+
"should see flush on idle returning since it will not wait for the engine reset",
4544+
IndexShard.class.getCanonicalName(),
4545+
Level.TRACE,
4546+
"flush on idle skipped"
4547+
)
4548+
);
4549+
shard.flushOnIdle(0);
4550+
mockLog.awaitAllExpectationsMatched();
4551+
}
4552+
4553+
release.countDown();
4554+
safeGet(reset);
4555+
assertThat(engineResetLock.isWriteLocked(), equalTo(false));
4556+
resetEngineThread.join();
4557+
closeShards(shard);
4558+
}
4559+
45004560
public void testMultiplePeriodicFlushesCanBeTriggeredBeforeTheyAreDurable() throws Exception {
45014561
List<ActionListener<Engine.FlushResult>> pendingListeners = Collections.synchronizedList(new ArrayList<>());
45024562
// Ensure that a single document forces a flush after each write

0 commit comments

Comments
 (0)