Skip to content

Commit feaa580

Browse files
authored
Add tryWithEngineOrNull (#132000)
And deprecate old style getEngine/OrNull methods. Apply new functionality to several methods that do not need to wait for the engine being reset and can do with a null engine. These pertain typically to periodic operations that can skip a shard being reset and revisit it next time. Also return empty stats for a few stats in case the engine is being reset. These are stats that are already returned empty from a hollow engine. Relates ES-11457
1 parent 2353ec6 commit feaa580

File tree

2 files changed

+162
-57
lines changed

2 files changed

+162
-57
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 101 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,11 +1352,12 @@ public void externalRefresh(String source, ActionListener<Engine.RefreshResult>
13521352
* Returns how many bytes we are currently moving from heap to disk
13531353
*/
13541354
public long getWritingBytes() {
1355-
Engine engine = getEngineOrNull();
1356-
if (engine == null) {
1357-
return 0;
1358-
}
1359-
return engine.getWritingBytes();
1355+
return tryWithEngineOrNull(engine -> {
1356+
if (engine == null) {
1357+
return 0L;
1358+
}
1359+
return engine.getWritingBytes();
1360+
});
13601361
}
13611362

13621363
public RefreshStats refreshStats() {
@@ -1371,12 +1372,13 @@ public RefreshStats refreshStats() {
13711372
}
13721373

13731374
public FlushStats flushStats() {
1374-
final Engine engine = getEngineOrNull();
1375-
return new FlushStats(
1376-
flushMetric.count(),
1377-
periodicFlushMetric.count(),
1378-
TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()),
1379-
engine != null ? engine.getTotalFlushTimeExcludingWaitingOnLockInMillis() : 0L
1375+
return tryWithEngineOrNull(
1376+
engine -> new FlushStats(
1377+
flushMetric.count(),
1378+
periodicFlushMetric.count(),
1379+
TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()),
1380+
engine != null ? engine.getTotalFlushTimeExcludingWaitingOnLockInMillis() : 0L
1381+
)
13801382
);
13811383
}
13821384

@@ -1402,30 +1404,33 @@ public SeqNoStats seqNoStats() {
14021404
}
14031405

14041406
public IndexingStats indexingStats() {
1405-
Engine engine = getEngineOrNull();
1406-
final boolean throttled;
1407-
final long throttleTimeInMillis;
1408-
if (engine == null) {
1409-
throttled = false;
1410-
throttleTimeInMillis = 0;
1411-
} else {
1412-
throttled = engine.isThrottled();
1413-
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
1414-
}
1407+
return tryWithEngineOrNull(engine -> {
1408+
final boolean throttled;
1409+
final long throttleTimeInMillis;
1410+
if (engine == null) {
1411+
throttled = false;
1412+
throttleTimeInMillis = 0;
1413+
} else {
1414+
throttled = engine.isThrottled();
1415+
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
1416+
}
14151417

1416-
long currentTimeInNanos = getRelativeTimeInNanos();
1417-
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started.
1418-
// In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero).
1419-
long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) ? (currentTimeInNanos - startedRelativeTimeInNanos) : 0L;
1420-
return internalIndexingStats.stats(
1421-
throttled,
1422-
throttleTimeInMillis,
1423-
indexingTimeBeforeShardStartedInNanos,
1424-
indexingTaskExecutionTimeBeforeShardStartedInNanos,
1425-
timeSinceShardStartedInNanos,
1426-
currentTimeInNanos,
1427-
recentIndexingLoadAtShardStarted
1428-
);
1418+
long currentTimeInNanos = getRelativeTimeInNanos();
1419+
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started.
1420+
// In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero).
1421+
long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L)
1422+
? (currentTimeInNanos - startedRelativeTimeInNanos)
1423+
: 0L;
1424+
return internalIndexingStats.stats(
1425+
throttled,
1426+
throttleTimeInMillis,
1427+
indexingTimeBeforeShardStartedInNanos,
1428+
indexingTaskExecutionTimeBeforeShardStartedInNanos,
1429+
timeSinceShardStartedInNanos,
1430+
currentTimeInNanos,
1431+
recentIndexingLoadAtShardStarted
1432+
);
1433+
});
14291434
}
14301435

14311436
public SearchStats searchStats(String... groups) {
@@ -2518,15 +2523,16 @@ private void verifyNotClosed(Exception suppressed) throws IllegalIndexShardState
25182523
* Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
25192524
*/
25202525
public long getIndexBufferRAMBytesUsed() {
2521-
Engine engine = getEngineOrNull();
2522-
if (engine == null) {
2523-
return 0;
2524-
}
2525-
try {
2526-
return engine.getIndexBufferRAMBytesUsed();
2527-
} catch (AlreadyClosedException ex) {
2528-
return 0;
2529-
}
2526+
return tryWithEngineOrNull(engine -> {
2527+
if (engine == null) {
2528+
return 0L;
2529+
}
2530+
try {
2531+
return engine.getIndexBufferRAMBytesUsed();
2532+
} catch (AlreadyClosedException ex) {
2533+
return 0L;
2534+
}
2535+
});
25302536
}
25312537

25322538
public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
@@ -3387,6 +3393,10 @@ private void doCheckIndex() throws IOException {
33873393
recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS)));
33883394
}
33893395

3396+
/**
3397+
* @deprecated use {@link #withEngine(Function)} instead, which considers the situation an engine may be reset / changed.
3398+
*/
3399+
@Deprecated
33903400
Engine getEngine() {
33913401
engineResetLock.readLock().lock();
33923402
try {
@@ -3399,7 +3409,10 @@ Engine getEngine() {
33993409
/**
34003410
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
34013411
* closed.
3412+
*
3413+
* @deprecated use the try/withEngine* family of functions instead, which consider the situation an engine may be reset / changed.
34023414
*/
3415+
@Deprecated
34033416
public Engine getEngineOrNull() {
34043417
engineResetLock.readLock().lock();
34053418
try {
@@ -3425,35 +3438,50 @@ private Engine getAndSetCurrentEngine(Engine newEngine) {
34253438

34263439
/**
34273440
* Executes an operation while preventing the shard's engine instance to be reset during the execution.
3428-
* The operation might be executed with a {@code null} engine instance. The engine might be closed while the operation is executed.
3441+
* The operation might be executed with a {@code null} engine instance in case the engine/shard is closed or the engine is already
3442+
* being reset. During an engine reset, this means the function will not block and return a null engine. The engine might be closed
3443+
* while the operation is executed.
3444+
*
3445+
* @param operation the operation to execute
3446+
* @return the result of the operation
3447+
* @param <R> the type of the result
3448+
*/
3449+
public <R> R tryWithEngineOrNull(Function<Engine, R> operation) {
3450+
return withEngine(operation, true, false);
3451+
}
3452+
3453+
/**
3454+
* Executes an operation while preventing the shard's engine instance to be reset during the execution.
3455+
* The operation might be executed with a {@code null} engine instance in case the engine/shard is closed. The function may block
3456+
* during an engine reset. The engine might be closed while the operation is executed.
34293457
*
34303458
* @param operation the operation to execute
34313459
* @return the result of the operation
34323460
* @param <R> the type of the result
34333461
*/
34343462
public <R> R withEngineOrNull(Function<Engine, R> operation) {
3435-
return withEngine(operation, true);
3463+
return withEngine(operation, true, true);
34363464
}
34373465

34383466
/**
34393467
* Executes an operation while preventing the shard's engine instance to be reset during the execution.
34403468
* If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The
3441-
* engine might be closed while the operation is executed.
3469+
* function may block during an engine reset. The engine might be closed while the operation is executed.
34423470
*
34433471
* @param operation the operation to execute
34443472
* @return the result of the operation
34453473
* @param <R> the type of the result
34463474
* @throws AlreadyClosedException if the current engine instance is {@code null}.
34473475
*/
34483476
public <R> R withEngine(Function<Engine, R> operation) {
3449-
return withEngine(operation, false);
3477+
return withEngine(operation, false, true);
34503478
}
34513479

34523480
/**
34533481
* Executes an operation (potentially throwing a checked exception) while preventing the shard's engine instance to be reset during the
34543482
* execution.
34553483
* If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The
3456-
* engine might be closed while the operation is executed.
3484+
* function may block during an engine reset. The engine might be closed while the operation is executed.
34573485
*
34583486
* @param operation the operation to execute
34593487
* @return the result of the operation
@@ -3481,22 +3509,38 @@ public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R,
34813509
* The parameter {@code allowNoEngine} is used to allow the operation to be executed when the current engine instance is {@code null}.
34823510
* When {@code allowNoEngine} is set to {@code `false`} the method will throw an {@link AlreadyClosedException} if the current engine
34833511
* instance is {@code null}.
3512+
* The parameter {@code blockIfResetting} is used to execute the operation with a {@code null} engine instance if the engine is
3513+
* being reset at the time the function is invoked. This is useful for operations that access a shard's engine periodically, e.g.,
3514+
* to get statistics, and are OK to skip it if the engine is being reset, rather than blocking until reset is complete. Note that
3515+
* it is illegal to set this parameter to true if {@code allowNoEngine} is set to false.
34843516
*
34853517
* @param operation the operation to execute
34863518
* @param allowNoEngine if the operation can be executed even if the current engine instance is {@code null}
34873519
* @return the result of the operation
34883520
* @param <R> the type of the result
34893521
*/
3490-
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
3491-
assert assertCurrentThreadWithEngine();
3522+
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine, boolean blockIfResetting) {
34923523
assert operation != null;
3493-
3494-
engineResetLock.readLock().lock();
3495-
try {
3496-
var engine = getCurrentEngine(allowNoEngine);
3497-
return operation.apply(engine);
3498-
} finally {
3499-
engineResetLock.readLock().unlock();
3524+
assert blockIfResetting == false || assertCurrentThreadWithEngine(); // assert current thread can block on engine resets
3525+
boolean locked = true;
3526+
if (blockIfResetting) {
3527+
engineResetLock.readLock().lock();
3528+
} else {
3529+
if (allowNoEngine == false) {
3530+
assert false : "blockIfResetting (false) only allowed with allowNoEngine (true)";
3531+
throw new IllegalArgumentException("blockIfResetting (false) only allowed with allowNoEngine (true)");
3532+
}
3533+
locked = engineResetLock.readLock().tryLock();
3534+
}
3535+
if (locked) {
3536+
try {
3537+
var engine = getCurrentEngine(allowNoEngine);
3538+
return operation.apply(engine);
3539+
} finally {
3540+
engineResetLock.readLock().unlock();
3541+
}
3542+
} else {
3543+
return operation.apply(null);
35003544
}
35013545
}
35023546

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5314,6 +5314,67 @@ public void prepareForEngineReset() throws IOException {
53145314
closeShards(shard);
53155315
}
53165316

5317+
public void testTryWithEngineOrNull() throws Exception {
5318+
final var preparedForReset = new AtomicBoolean();
5319+
final var shard = newStartedShard(true, Settings.EMPTY, config -> {
5320+
if (preparedForReset.get()) {
5321+
return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true);
5322+
} else {
5323+
return new InternalEngine(config) {
5324+
@Override
5325+
public void prepareForEngineReset() throws IOException {
5326+
assertTrue(preparedForReset.compareAndSet(false, true));
5327+
}
5328+
};
5329+
}
5330+
});
5331+
final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock();
5332+
5333+
final var release = new CountDownLatch(1);
5334+
final var reset = new PlainActionFuture<Void>();
5335+
final var resetEngineThread = new Thread(() -> {
5336+
try {
5337+
shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> {
5338+
try (permit) {
5339+
shard.resetEngine(newEngine -> {
5340+
assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true));
5341+
assertThat(newEngine, instanceOf(ReadOnlyEngine.class));
5342+
safeAwait(release);
5343+
});
5344+
assertThat(preparedForReset.get(), equalTo(true));
5345+
l.onResponse(null);
5346+
}
5347+
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
5348+
} catch (Exception e) {
5349+
reset.onFailure(e);
5350+
}
5351+
});
5352+
resetEngineThread.start();
5353+
5354+
assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true)));
5355+
5356+
shard.tryWithEngineOrNull(engine -> {
5357+
assertNull(engine);
5358+
assertThat(engineResetLock.isReadLocked(), equalTo(false));
5359+
assertThat(engineResetLock.isWriteLocked(), equalTo(true));
5360+
return null;
5361+
});
5362+
5363+
release.countDown();
5364+
safeGet(reset);
5365+
assertThat(engineResetLock.isWriteLocked(), equalTo(false));
5366+
5367+
shard.tryWithEngineOrNull(engine -> {
5368+
assertThat(engine, instanceOf(ReadOnlyEngine.class));
5369+
assertThat(engineResetLock.isReadLocked(), equalTo(true));
5370+
assertThat(engineResetLock.isWriteLocked(), equalTo(false));
5371+
return null;
5372+
});
5373+
5374+
resetEngineThread.join();
5375+
closeShards(shard);
5376+
}
5377+
53175378
public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception {
53185379
final var lazyShard = new AtomicReference<IndexShard>();
53195380
final var lazyEngineConfig = new AtomicReference<EngineConfig>();

0 commit comments

Comments
 (0)