Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,10 @@ public Engine.FlushResult flush(FlushRequest request) {
* @param listener to notify after full durability has been achieved
*/
public void flush(FlushRequest request, ActionListener<Engine.FlushResult> listener) {
flush(request, this::getEngine, listener);
}

protected void flush(FlushRequest request, Supplier<Engine> engine, ActionListener<Engine.FlushResult> listener) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
Expand All @@ -1539,7 +1543,9 @@ public void flush(FlushRequest request, ActionListener<Engine.FlushResult> liste
*/
verifyNotClosed();
final long startTime = System.nanoTime();
getEngine().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime)));
assert engine != null : "engine supplier is null";
assert engine.get() != null : "engine is null";
engine.get().flush(force, waitIfOngoing, ActionListener.runBefore(l, () -> flushMetric.inc(System.nanoTime() - startTime)));
});
}

Expand Down Expand Up @@ -2545,33 +2551,46 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
* indexing operation, so we can flush the index.
*/
public void flushOnIdle(long inactiveTimeNS) {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("flushing shard on inactive");
threadPool.executor(ThreadPool.Names.FLUSH)
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
@Override
public void onResponse(Engine.FlushResult flushResult) {
if (flushResult.skippedDueToCollision()) {
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
// will retry (#87888)
tryWithEngineOrNull(engineOrNull -> {
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("flushing shard on inactive");
threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
tryWithEngineOrNull(engine -> {
if (engine == null) {
// Revert active flag so that a next flushOnIdle request will retry
active.set(true);
}
periodicFlushMetric.inc();
}
} else {
flush(new FlushRequest().waitIfOngoing(false).force(false), () -> engine, new ActionListener<>() {
@Override
public void onResponse(Engine.FlushResult flushResult) {
if (flushResult.skippedDueToCollision()) {
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
// will retry (#87888)
active.set(true);
}
periodicFlushMetric.inc();
}

@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
active.set(true);
logger.warn("failed to flush shard on inactive", e);
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
active.set(true);
logger.warn("failed to flush shard on inactive", e);
}
}
});
}
}
}));
return null;
});
});
}
} else {
logger.trace(() -> "flush on idle skipped as it is either closed, engine being reset, or has been recently written to");
}
}
return null;
});
}

public boolean isActive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4497,6 +4497,66 @@ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
closeShards(shard);
}

@TestLogging(reason = "testing traces of concurrent flush and engine reset", value = "org.elasticsearch.index.shard.IndexShard:TRACE")
public void testFlushOnIdleDoesNotWaitWhileEngineIsReset() throws Exception {
final var preparedForReset = new AtomicBoolean();
final var shard = newStartedShard(true, Settings.EMPTY, config -> {
if (preparedForReset.get()) {
return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true);
} else {
return new InternalEngine(config) {
@Override
public void prepareForEngineReset() throws IOException {
assertTrue(preparedForReset.compareAndSet(false, true));
}
};
}
});
final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock();

final var release = new CountDownLatch(1);
final var reset = new PlainActionFuture<Void>();
final var resetEngineThread = new Thread(() -> {
try {
shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> {
try (permit) {
shard.resetEngine(newEngine -> {
assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true));
assertThat(newEngine, instanceOf(ReadOnlyEngine.class));
safeAwait(release);
});
assertThat(preparedForReset.get(), equalTo(true));
l.onResponse(null);
}
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
} catch (Exception e) {
reset.onFailure(e);
}
});
resetEngineThread.start();

assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true)));

try (var mockLog = MockLog.capture(IndexShard.class)) {
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"should see flush on idle returning since it will not wait for the engine reset",
IndexShard.class.getCanonicalName(),
Level.TRACE,
"flush on idle skipped"
)
);
shard.flushOnIdle(0);
mockLog.awaitAllExpectationsMatched();
}

release.countDown();
safeGet(reset);
assertThat(engineResetLock.isWriteLocked(), equalTo(false));
resetEngineThread.join();
closeShards(shard);
}

public void testMultiplePeriodicFlushesCanBeTriggeredBeforeTheyAreDurable() throws Exception {
List<ActionListener<Engine.FlushResult>> pendingListeners = Collections.synchronizedList(new ArrayList<>());
// Ensure that a single document forces a flush after each write
Expand Down