Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 96 additions & 55 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1347,11 +1347,12 @@ public void externalRefresh(String source, ActionListener<Engine.RefreshResult>
* Returns how many bytes we are currently moving from heap to disk
*/
public long getWritingBytes() {
Engine engine = getEngineOrNull();
if (engine == null) {
return 0;
}
return engine.getWritingBytes();
return tryWithEngineOrNull(engine -> {
if (engine == null) {
return 0L;
}
return engine.getWritingBytes();
});
}

public RefreshStats refreshStats() {
Expand All @@ -1366,12 +1367,13 @@ public RefreshStats refreshStats() {
}

public FlushStats flushStats() {
final Engine engine = getEngineOrNull();
return new FlushStats(
flushMetric.count(),
periodicFlushMetric.count(),
TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()),
engine != null ? engine.getTotalFlushTimeExcludingWaitingOnLockInMillis() : 0L
return tryWithEngineOrNull(
engine -> new FlushStats(
flushMetric.count(),
periodicFlushMetric.count(),
TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()),
engine != null ? engine.getTotalFlushTimeExcludingWaitingOnLockInMillis() : 0L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly a note for myself, we could extract some stats at the shard level and pass them down to the engine instances so that they "survive" resets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, was thinking also of that, although there may be some concurrency challenges to handle. Also, since we will soon shorten the reset period drastically, I am worried about doing any more effort on this front, including this PR (which could arguable be skipped if we didn't have long resets). However, such efforts might be useful in the future if resets become long again or there may be other engines being reset.

)
);
}

Expand All @@ -1397,30 +1399,33 @@ public SeqNoStats seqNoStats() {
}

public IndexingStats indexingStats() {
Engine engine = getEngineOrNull();
final boolean throttled;
final long throttleTimeInMillis;
if (engine == null) {
throttled = false;
throttleTimeInMillis = 0;
} else {
throttled = engine.isThrottled();
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
}
return tryWithEngineOrNull(engine -> {
final boolean throttled;
final long throttleTimeInMillis;
if (engine == null) {
throttled = false;
throttleTimeInMillis = 0;
} else {
throttled = engine.isThrottled();
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
}

long currentTimeInNanos = getRelativeTimeInNanos();
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started.
// In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero).
long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) ? (currentTimeInNanos - startedRelativeTimeInNanos) : 0L;
return internalIndexingStats.stats(
throttled,
throttleTimeInMillis,
indexingTimeBeforeShardStartedInNanos,
indexingTaskExecutionTimeBeforeShardStartedInNanos,
timeSinceShardStartedInNanos,
currentTimeInNanos,
recentIndexingLoadAtShardStarted
);
long currentTimeInNanos = getRelativeTimeInNanos();
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started.
// In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero).
long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L)
? (currentTimeInNanos - startedRelativeTimeInNanos)
: 0L;
return internalIndexingStats.stats(
throttled,
throttleTimeInMillis,
indexingTimeBeforeShardStartedInNanos,
indexingTaskExecutionTimeBeforeShardStartedInNanos,
timeSinceShardStartedInNanos,
currentTimeInNanos,
recentIndexingLoadAtShardStarted
);
});
}

public SearchStats searchStats(String... groups) {
Expand Down Expand Up @@ -2513,15 +2518,16 @@ private void verifyNotClosed(Exception suppressed) throws IllegalIndexShardState
* Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
*/
public long getIndexBufferRAMBytesUsed() {
Engine engine = getEngineOrNull();
if (engine == null) {
return 0;
}
try {
return engine.getIndexBufferRAMBytesUsed();
} catch (AlreadyClosedException ex) {
return 0;
}
return tryWithEngineOrNull(engine -> {
if (engine == null) {
return 0L;
}
try {
return engine.getIndexBufferRAMBytesUsed();
} catch (AlreadyClosedException ex) {
return 0L;
}
});
}

public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
Expand Down Expand Up @@ -3366,6 +3372,10 @@ private void doCheckIndex() throws IOException {
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
}

/**
* @deprecated use {@link #withEngine(Function)} instead, which considers the situation an engine may be reset / changed.
*/
@Deprecated
Engine getEngine() {
engineResetLock.readLock().lock();
try {
Expand All @@ -3378,7 +3388,10 @@ Engine getEngine() {
/**
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
* closed.
*
* @deprecated use the try/withEngine* family of functions instead, which consider the situation an engine may be reset / changed.
*/
@Deprecated
public Engine getEngineOrNull() {
engineResetLock.readLock().lock();
try {
Expand All @@ -3404,14 +3417,28 @@ private Engine getAndSetCurrentEngine(Engine newEngine) {

/**
* Executes an operation while preventing the shard's engine instance to be reset during the execution.
* The operation might be executed with a {@code null} engine instance. The engine might be closed while the operation is executed.
* The operation might be executed with a {@code null} engine instance in case the engine/shard is closed or the engine is being
* reset. The engine might be closed while the operation is executed.
*
* @param operation the operation to execute
* @return the result of the operation
* @param <R> the type of the result
*/
public <R> R tryWithEngineOrNull(Function<Engine, R> operation) {
return withEngine(operation, true, true);
}

/**
* Executes an operation while preventing the shard's engine instance to be reset during the execution.
* The operation might be executed with a {@code null} engine instance in case the engine/shard is closed. The engine might be
* closed while the operation is executed.
*
* @param operation the operation to execute
* @return the result of the operation
* @param <R> the type of the result
*/
public <R> R withEngineOrNull(Function<Engine, R> operation) {
return withEngine(operation, true);
return withEngine(operation, true, false);
}

/**
Expand All @@ -3425,7 +3452,7 @@ public <R> R withEngineOrNull(Function<Engine, R> operation) {
* @throws AlreadyClosedException if the current engine instance is {@code null}.
*/
public <R> R withEngine(Function<Engine, R> operation) {
return withEngine(operation, false);
return withEngine(operation, false, false);
}

/**
Expand Down Expand Up @@ -3460,22 +3487,36 @@ public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R,
* The parameter {@code allowNoEngine} is used to allow the operation to be executed when the current engine instance is {@code null}.
* When {@code allowNoEngine} is set to {@code `false`} the method will throw an {@link AlreadyClosedException} if the current engine
* instance is {@code null}.
* The parameter {@code noEngineDuringReset} is used to execute the operation with a {@code null} engine instance if the engine is
* being reset at the time the function is invoked. This is useful for operations that access a shard's engine periodically, e.g.,
* to get statistics, and are OK to skip it if the engine is being reset, rather than blocking until reset is complete. Note that
* it is illegal to set this parameter to true if {@code allowNoEngine} is set to false.
*
* @param operation the operation to execute
* @param allowNoEngine if the operation can be executed even if the current engine instance is {@code null}
* @return the result of the operation
* @param <R> the type of the result
*/
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
assert assertCurrentThreadWithEngine();
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, boolean noEngineDuringReset) {
assert operation != null;

engineResetLock.readLock().lock();
try {
var engine = getCurrentEngine(allowNoEngine);
return operation.apply(engine);
} finally {
engineResetLock.readLock().unlock();
assert allowNoEngine || noEngineDuringReset == false
: "noEngineDuringReset " + noEngineDuringReset + " (true) is not allowed when allowNoEngine is " + allowNoEngine + " (false)";
boolean locked = true;
if (noEngineDuringReset) {
locked = engineResetLock.readLock().tryLock();
} else {
assert assertCurrentThreadWithEngine();
engineResetLock.readLock().lock();
}
if (locked) {
try {
var engine = getCurrentEngine(allowNoEngine);
return operation.apply(engine);
} finally {
engineResetLock.readLock().unlock();
}
} else {
return operation.apply(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5266,6 +5266,65 @@ public void prepareForEngineReset() throws IOException {
closeShards(shard);
}

public void testTryWithEngineOrNull() throws Exception {
final var preparedForReset = new AtomicBoolean();
final var shard = newStartedShard(true, Settings.EMPTY, config -> {
if (preparedForReset.get()) {
return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true);
} else {
return new InternalEngine(config) {
@Override
public void prepareForEngineReset() throws IOException {
assertTrue(preparedForReset.compareAndSet(false, true));
}
};
}
});
final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock();

final var release = new CountDownLatch(1);
final var reset = new PlainActionFuture<Void>();
final var resetEngineThread = new Thread(() -> {
try {
shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> {
try (permit) {
shard.resetEngine(newEngine -> {
assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true));
assertThat(newEngine, instanceOf(ReadOnlyEngine.class));
safeAwait(release);
});
assertThat(preparedForReset.get(), equalTo(true));
l.onResponse(null);
}
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
} catch (Exception e) {
reset.onFailure(e);
}
});
resetEngineThread.start();

assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true)));

shard.tryWithEngineOrNull(engine -> {
assertNull(engine);
assertThat(engineResetLock.isReadLocked(), equalTo(false));
return null;
});

release.countDown();
safeGet(reset);
assertThat(engineResetLock.isWriteLocked(), equalTo(false));

shard.tryWithEngineOrNull(engine -> {
assertThat(engine, instanceOf(ReadOnlyEngine.class));
assertThat(engineResetLock.isReadLocked(), equalTo(true));
return null;
});

resetEngineThread.join();
closeShards(shard);
}

public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception {
final var lazyShard = new AtomicReference<IndexShard>();
final var lazyEngineConfig = new AtomicReference<EngineConfig>();
Expand Down