diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6641fd8cebe44..9fed65186abd3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1352,11 +1352,12 @@ public void externalRefresh(String source, ActionListener * Returns how many bytes we are currently moving from heap to disk */ public long getWritingBytes() { - Engine engine = getEngineOrNull(); - if (engine == null) { - return 0; - } - return engine.getWritingBytes(); + return tryWithEngineOrNull(engine -> { + if (engine == null) { + return 0L; + } + return engine.getWritingBytes(); + }); } public RefreshStats refreshStats() { @@ -1371,12 +1372,13 @@ public RefreshStats refreshStats() { } public FlushStats flushStats() { - final Engine engine = getEngineOrNull(); - return new FlushStats( - flushMetric.count(), - periodicFlushMetric.count(), - TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()), - engine != null ? engine.getTotalFlushTimeExcludingWaitingOnLockInMillis() : 0L + return tryWithEngineOrNull( + engine -> new FlushStats( + flushMetric.count(), + periodicFlushMetric.count(), + TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()), + engine != null ? engine.getTotalFlushTimeExcludingWaitingOnLockInMillis() : 0L + ) ); } @@ -1402,30 +1404,33 @@ public SeqNoStats seqNoStats() { } public IndexingStats indexingStats() { - Engine engine = getEngineOrNull(); - final boolean throttled; - final long throttleTimeInMillis; - if (engine == null) { - throttled = false; - throttleTimeInMillis = 0; - } else { - throttled = engine.isThrottled(); - throttleTimeInMillis = engine.getIndexThrottleTimeInMillis(); - } + return tryWithEngineOrNull(engine -> { + final boolean throttled; + final long throttleTimeInMillis; + if (engine == null) { + throttled = false; + throttleTimeInMillis = 0; + } else { + throttled = engine.isThrottled(); + throttleTimeInMillis = engine.getIndexThrottleTimeInMillis(); + } - long currentTimeInNanos = getRelativeTimeInNanos(); - // We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started. - // In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero). - long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) ? (currentTimeInNanos - startedRelativeTimeInNanos) : 0L; - return internalIndexingStats.stats( - throttled, - throttleTimeInMillis, - indexingTimeBeforeShardStartedInNanos, - indexingTaskExecutionTimeBeforeShardStartedInNanos, - timeSinceShardStartedInNanos, - currentTimeInNanos, - recentIndexingLoadAtShardStarted - ); + long currentTimeInNanos = getRelativeTimeInNanos(); + // We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started. + // In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero). + long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) + ? (currentTimeInNanos - startedRelativeTimeInNanos) + : 0L; + return internalIndexingStats.stats( + throttled, + throttleTimeInMillis, + indexingTimeBeforeShardStartedInNanos, + indexingTaskExecutionTimeBeforeShardStartedInNanos, + timeSinceShardStartedInNanos, + currentTimeInNanos, + recentIndexingLoadAtShardStarted + ); + }); } public SearchStats searchStats(String... groups) { @@ -2518,15 +2523,16 @@ private void verifyNotClosed(Exception suppressed) throws IllegalIndexShardState * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ public long getIndexBufferRAMBytesUsed() { - Engine engine = getEngineOrNull(); - if (engine == null) { - return 0; - } - try { - return engine.getIndexBufferRAMBytesUsed(); - } catch (AlreadyClosedException ex) { - return 0; - } + return tryWithEngineOrNull(engine -> { + if (engine == null) { + return 0L; + } + try { + return engine.getIndexBufferRAMBytesUsed(); + } catch (AlreadyClosedException ex) { + return 0L; + } + }); } public void addShardFailureCallback(Consumer onShardFailure) { @@ -3387,6 +3393,10 @@ private void doCheckIndex() throws IOException { recoveryState.getVerifyIndex().checkIndexTime(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - timeNS))); } + /** + * @deprecated use {@link #withEngine(Function)} instead, which considers the situation an engine may be reset / changed. + */ + @Deprecated Engine getEngine() { engineResetLock.readLock().lock(); try { @@ -3399,7 +3409,10 @@ Engine getEngine() { /** * NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is * closed. + * + * @deprecated use the try/withEngine* family of functions instead, which consider the situation an engine may be reset / changed. */ + @Deprecated public Engine getEngineOrNull() { engineResetLock.readLock().lock(); try { @@ -3425,20 +3438,35 @@ private Engine getAndSetCurrentEngine(Engine newEngine) { /** * Executes an operation while preventing the shard's engine instance to be reset during the execution. - * The operation might be executed with a {@code null} engine instance. The engine might be closed while the operation is executed. + * The operation might be executed with a {@code null} engine instance in case the engine/shard is closed or the engine is already + * being reset. During an engine reset, this means the function will not block and return a null engine. The engine might be closed + * while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + */ + public R tryWithEngineOrNull(Function operation) { + return withEngine(operation, true, false); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution. + * The operation might be executed with a {@code null} engine instance in case the engine/shard is closed. The function may block + * during an engine reset. The engine might be closed while the operation is executed. * * @param operation the operation to execute * @return the result of the operation * @param the type of the result */ public R withEngineOrNull(Function operation) { - return withEngine(operation, true); + return withEngine(operation, true, true); } /** * Executes an operation while preventing the shard's engine instance to be reset during the execution. * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The - * engine might be closed while the operation is executed. + * function may block during an engine reset. The engine might be closed while the operation is executed. * * @param operation the operation to execute * @return the result of the operation @@ -3446,14 +3474,14 @@ public R withEngineOrNull(Function operation) { * @throws AlreadyClosedException if the current engine instance is {@code null}. */ public R withEngine(Function operation) { - return withEngine(operation, false); + return withEngine(operation, false, true); } /** * Executes an operation (potentially throwing a checked exception) while preventing the shard's engine instance to be reset during the * execution. * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The - * engine might be closed while the operation is executed. + * function may block during an engine reset. The engine might be closed while the operation is executed. * * @param operation the operation to execute * @return the result of the operation @@ -3481,22 +3509,38 @@ public R withEngineException(CheckedFunction the type of the result */ - private R withEngine(Function operation, boolean allowNoEngine) { - assert assertCurrentThreadWithEngine(); + private R withEngine(Function operation, boolean allowNoEngine, boolean blockIfResetting) { assert operation != null; - - engineResetLock.readLock().lock(); - try { - var engine = getCurrentEngine(allowNoEngine); - return operation.apply(engine); - } finally { - engineResetLock.readLock().unlock(); + assert blockIfResetting == false || assertCurrentThreadWithEngine(); // assert current thread can block on engine resets + boolean locked = true; + if (blockIfResetting) { + engineResetLock.readLock().lock(); + } else { + if (allowNoEngine == false) { + assert false : "blockIfResetting (false) only allowed with allowNoEngine (true)"; + throw new IllegalArgumentException("blockIfResetting (false) only allowed with allowNoEngine (true)"); + } + locked = engineResetLock.readLock().tryLock(); + } + if (locked) { + try { + var engine = getCurrentEngine(allowNoEngine); + return operation.apply(engine); + } finally { + engineResetLock.readLock().unlock(); + } + } else { + return operation.apply(null); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b40c7327c44d1..9101ba4bdc4b4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5314,6 +5314,67 @@ public void prepareForEngineReset() throws IOException { closeShards(shard); } + public void testTryWithEngineOrNull() throws Exception { + final var preparedForReset = new AtomicBoolean(); + final var shard = newStartedShard(true, Settings.EMPTY, config -> { + if (preparedForReset.get()) { + return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true); + } else { + return new InternalEngine(config) { + @Override + public void prepareForEngineReset() throws IOException { + assertTrue(preparedForReset.compareAndSet(false, true)); + } + }; + } + }); + final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock(); + + final var release = new CountDownLatch(1); + final var reset = new PlainActionFuture(); + final var resetEngineThread = new Thread(() -> { + try { + shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> { + try (permit) { + shard.resetEngine(newEngine -> { + assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true)); + assertThat(newEngine, instanceOf(ReadOnlyEngine.class)); + safeAwait(release); + }); + assertThat(preparedForReset.get(), equalTo(true)); + l.onResponse(null); + } + }), EsExecutors.DIRECT_EXECUTOR_SERVICE); + } catch (Exception e) { + reset.onFailure(e); + } + }); + resetEngineThread.start(); + + assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true))); + + shard.tryWithEngineOrNull(engine -> { + assertNull(engine); + assertThat(engineResetLock.isReadLocked(), equalTo(false)); + assertThat(engineResetLock.isWriteLocked(), equalTo(true)); + return null; + }); + + release.countDown(); + safeGet(reset); + assertThat(engineResetLock.isWriteLocked(), equalTo(false)); + + shard.tryWithEngineOrNull(engine -> { + assertThat(engine, instanceOf(ReadOnlyEngine.class)); + assertThat(engineResetLock.isReadLocked(), equalTo(true)); + assertThat(engineResetLock.isWriteLocked(), equalTo(false)); + return null; + }); + + resetEngineThread.join(); + closeShards(shard); + } + public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception { final var lazyShard = new AtomicReference(); final var lazyEngineConfig = new AtomicReference();