Skip to content

Commit 2331e6b

Browse files
committed
Revert changes not related to metrics
1 parent ef0cc21 commit 2331e6b

File tree

3 files changed

+59
-159
lines changed

3 files changed

+59
-159
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,16 @@ public FlushRequest(StreamInput in) throws IOException {
4545
}
4646

4747
/**
48-
* Returns {@code true} iff a flush should block (e.g., if the engine is being reset or
49-
* if a another flush operation is already running). Otherwise {@code false}
48+
* Returns {@code true} iff a flush should block
49+
* if a another flush operation is already running. Otherwise {@code false}
5050
*/
5151
public boolean waitIfOngoing() {
5252
return this.waitIfOngoing;
5353
}
5454

5555
/**
56-
* if set to {@code true} the flush will block (e.g., if the engine is being reset or
57-
* if a another flush operation is already running) until the flush can be performed.
56+
* if set to {@code true} the flush will block
57+
* if a another flush operation is already running until the flush can be performed.
5858
* The default is <code>true</code>
5959
*/
6060
public FlushRequest waitIfOngoing(boolean waitIfOngoing) {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 52 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,7 +1352,7 @@ public void externalRefresh(String source, ActionListener<Engine.RefreshResult>
13521352
* Returns how many bytes we are currently moving from heap to disk
13531353
*/
13541354
public long getWritingBytes() {
1355-
return withEngineOrNullIfBeingReset(engine -> {
1355+
return tryWithEngineOrNull(engine -> {
13561356
if (engine == null) {
13571357
return 0L;
13581358
}
@@ -1372,7 +1372,7 @@ public RefreshStats refreshStats() {
13721372
}
13731373

13741374
public FlushStats flushStats() {
1375-
return withEngineOrNullIfBeingReset(
1375+
return tryWithEngineOrNull(
13761376
engine -> new FlushStats(
13771377
flushMetric.count(),
13781378
periodicFlushMetric.count(),
@@ -1404,7 +1404,7 @@ public SeqNoStats seqNoStats() {
14041404
}
14051405

14061406
public IndexingStats indexingStats() {
1407-
return withEngineOrNullIfBeingReset(engine -> {
1407+
return tryWithEngineOrNull(engine -> {
14081408
final boolean throttled;
14091409
final long throttleTimeInMillis;
14101410
if (engine == null) {
@@ -1528,7 +1528,7 @@ public boolean flush(FlushRequest request) {
15281528
* @param request the flush request
15291529
* @param listener to notify after full durability has been achieved.
15301530
* <code>false</code> if <code>waitIfOngoing==false</code>
1531-
* and an ongoing request or engine reset is detected, else <code>true</code>.
1531+
* and an ongoing request is detected, else <code>true</code>.
15321532
* If <code>false</code> is returned, no flush happened.
15331533
*/
15341534
public void flush(FlushRequest request, ActionListener<Boolean> listener) {
@@ -1543,23 +1543,11 @@ public void flush(FlushRequest request, ActionListener<Boolean> listener) {
15431543
*/
15441544
verifyNotClosed();
15451545
final long startTime = System.nanoTime();
1546-
final var finalListener = ActionListener.runBefore(
1547-
l.map(Engine.FlushResult::flushPerformed),
1548-
() -> flushMetric.inc(System.nanoTime() - startTime)
1546+
getEngine().flush(
1547+
force,
1548+
waitIfOngoing,
1549+
ActionListener.runBefore(l.map(Engine.FlushResult::flushPerformed), () -> flushMetric.inc(System.nanoTime() - startTime))
15491550
);
1550-
if (waitIfOngoing) {
1551-
getEngine().flush(force, waitIfOngoing, finalListener);
1552-
} else {
1553-
withEngineOrNullIfBeingReset(engine -> {
1554-
if (engine == null) {
1555-
logger.trace("detected engine is closed or being reset, not blocking");
1556-
finalListener.onResponse(Engine.FlushResult.NO_FLUSH);
1557-
} else {
1558-
engine.flush(force, waitIfOngoing, finalListener);
1559-
}
1560-
return null;
1561-
});
1562-
}
15631551
});
15641552
}
15651553

@@ -1590,12 +1578,8 @@ public long readGlobalCheckpointForRecovery(Map<String, String> commitUserData)
15901578
*/
15911579
public void trimTranslog() {
15921580
verifyNotClosed();
1593-
withEngineOrNullIfBeingReset(engine -> {
1594-
if (engine != null) {
1595-
engine.trimUnreferencedTranslogFiles();
1596-
}
1597-
return null;
1598-
});
1581+
final Engine engine = getEngine();
1582+
engine.trimUnreferencedTranslogFiles();
15991583
}
16001584

16011585
/**
@@ -2539,7 +2523,7 @@ private void verifyNotClosed(Exception suppressed) throws IllegalIndexShardState
25392523
* Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
25402524
*/
25412525
public long getIndexBufferRAMBytesUsed() {
2542-
return withEngineOrNullIfBeingReset(engine -> {
2526+
return tryWithEngineOrNull(engine -> {
25432527
if (engine == null) {
25442528
return 0L;
25452529
}
@@ -2560,37 +2544,33 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {
25602544
* indexing operation, so we can flush the index.
25612545
*/
25622546
public void flushOnIdle(long inactiveTimeNS) {
2563-
withEngineOrNullIfBeingReset(engineOrNull -> {
2564-
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
2565-
boolean wasActive = active.getAndSet(false);
2566-
if (wasActive) {
2567-
logger.debug("flushing shard on inactive");
2568-
threadPool.executor(ThreadPool.Names.FLUSH)
2569-
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
2570-
@Override
2571-
public void onResponse(Boolean flushed) {
2572-
if (flushed == false) {
2573-
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
2574-
// will retry (#87888)
2575-
active.set(true);
2576-
}
2577-
periodicFlushMetric.inc();
2547+
Engine engineOrNull = getEngineOrNull();
2548+
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
2549+
boolean wasActive = active.getAndSet(false);
2550+
if (wasActive) {
2551+
logger.debug("flushing shard on inactive");
2552+
threadPool.executor(ThreadPool.Names.FLUSH)
2553+
.execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
2554+
@Override
2555+
public void onResponse(Boolean flushed) {
2556+
if (flushed == false) {
2557+
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
2558+
// will retry (#87888)
2559+
active.set(true);
25782560
}
2561+
periodicFlushMetric.inc();
2562+
}
25792563

2580-
@Override
2581-
public void onFailure(Exception e) {
2582-
if (state != IndexShardState.CLOSED) {
2583-
active.set(true);
2584-
logger.warn("failed to flush shard on inactive", e);
2585-
}
2564+
@Override
2565+
public void onFailure(Exception e) {
2566+
if (state != IndexShardState.CLOSED) {
2567+
active.set(true);
2568+
logger.warn("failed to flush shard on inactive", e);
25862569
}
2587-
}));
2588-
}
2589-
} else {
2590-
logger.trace(() -> "flush on idle skipped as it is either closed, engine being reset, or has been recently written to");
2570+
}
2571+
}));
25912572
}
2592-
return null;
2593-
});
2573+
}
25942574
}
25952575

25962576
public boolean isActive() {
@@ -2781,21 +2761,19 @@ public IndexEventListener getIndexEventListener() {
27812761
* setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread.
27822762
*/
27832763
public void activateThrottling() {
2784-
withEngineOrNullIfBeingReset(engine -> {
2785-
if (engine != null) {
2786-
engine.activateThrottling();
2787-
}
2788-
return null;
2789-
});
2764+
try {
2765+
getEngine().activateThrottling();
2766+
} catch (AlreadyClosedException ex) {
2767+
// ignore
2768+
}
27902769
}
27912770

27922771
public void deactivateThrottling() {
2793-
withEngineOrNullIfBeingReset(engine -> {
2794-
if (engine != null) {
2795-
engine.deactivateThrottling();
2796-
}
2797-
return null;
2798-
});
2772+
try {
2773+
getEngine().deactivateThrottling();
2774+
} catch (AlreadyClosedException ex) {
2775+
// ignore
2776+
}
27992777
}
28002778

28012779
private void suspendThrottling() {
@@ -2840,16 +2818,12 @@ private void handleRefreshException(Exception e) {
28402818
* Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
28412819
*/
28422820
public void writeIndexingBuffer() {
2843-
withEngineOrNullIfBeingReset(engine -> {
2844-
if (engine != null) {
2845-
try {
2846-
engine.writeIndexingBuffer();
2847-
} catch (Exception e) {
2848-
handleRefreshException(e);
2849-
}
2850-
}
2851-
return null;
2852-
});
2821+
try {
2822+
Engine engine = getEngine();
2823+
engine.writeIndexingBuffer();
2824+
} catch (Exception e) {
2825+
handleRefreshException(e);
2826+
}
28532827
}
28542828

28552829
/**
@@ -3436,7 +3410,7 @@ Engine getEngine() {
34363410
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
34373411
* closed.
34383412
*
3439-
* @deprecated use the withEngine* family of functions instead, which consider the situation an engine may be reset / changed.
3413+
* @deprecated use the try/withEngine* family of functions instead, which consider the situation an engine may be reset / changed.
34403414
*/
34413415
@Deprecated
34423416
public Engine getEngineOrNull() {
@@ -3471,7 +3445,7 @@ private Engine getAndSetCurrentEngine(Engine newEngine) {
34713445
* @return the result of the operation
34723446
* @param <R> the type of the result
34733447
*/
3474-
public <R> R withEngineOrNullIfBeingReset(Function<Engine, R> operation) {
3448+
public <R> R tryWithEngineOrNull(Function<Engine, R> operation) {
34753449
return withEngine(operation, true, true);
34763450
}
34773451

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 3 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -4369,80 +4369,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
43694369
}
43704370
}
43714371

4372-
@TestLogging(reason = "testing traces of concurrent flush and engine reset", value = "org.elasticsearch.index.shard.IndexShard:TRACE")
4373-
public void testFlushOnIdleDoesNotWaitWhileEngineIsReset() throws Exception {
4374-
final var preparedForReset = new AtomicBoolean();
4375-
final var shard = newStartedShard(true, Settings.EMPTY, config -> {
4376-
if (preparedForReset.get()) {
4377-
return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true);
4378-
} else {
4379-
return new InternalEngine(config) {
4380-
@Override
4381-
public void prepareForEngineReset() throws IOException {
4382-
assertTrue(preparedForReset.compareAndSet(false, true));
4383-
}
4384-
};
4385-
}
4386-
});
4387-
final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock();
4388-
4389-
final var release = new CountDownLatch(1);
4390-
final var reset = new PlainActionFuture<Void>();
4391-
final var resetEngineThread = new Thread(() -> {
4392-
try {
4393-
shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> {
4394-
try (permit) {
4395-
shard.resetEngine(newEngine -> {
4396-
assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true));
4397-
assertThat(newEngine, instanceOf(ReadOnlyEngine.class));
4398-
safeAwait(release);
4399-
});
4400-
assertThat(preparedForReset.get(), equalTo(true));
4401-
l.onResponse(null);
4402-
}
4403-
}), EsExecutors.DIRECT_EXECUTOR_SERVICE);
4404-
} catch (Exception e) {
4405-
reset.onFailure(e);
4406-
}
4407-
});
4408-
resetEngineThread.start();
4409-
4410-
assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true)));
4411-
4412-
try (var mockLog = MockLog.capture(IndexShard.class)) {
4413-
mockLog.addExpectation(
4414-
new MockLog.SeenEventExpectation(
4415-
"should see flush on idle returning since it will not wait for the engine reset",
4416-
IndexShard.class.getCanonicalName(),
4417-
Level.TRACE,
4418-
"flush on idle skipped"
4419-
)
4420-
);
4421-
shard.flushOnIdle(0);
4422-
mockLog.awaitAllExpectationsMatched();
4423-
}
4424-
4425-
try (var mockLog = MockLog.capture(IndexShard.class)) {
4426-
mockLog.addExpectation(
4427-
new MockLog.SeenEventExpectation(
4428-
"should see flush returning since it will not wait for the engine reset",
4429-
IndexShard.class.getCanonicalName(),
4430-
Level.TRACE,
4431-
"detected engine is closed or being reset, not blocking"
4432-
)
4433-
);
4434-
// A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately
4435-
assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)));
4436-
mockLog.awaitAllExpectationsMatched();
4437-
}
4438-
4439-
release.countDown();
4440-
safeGet(reset);
4441-
assertThat(engineResetLock.isWriteLocked(), equalTo(false));
4442-
resetEngineThread.join();
4443-
closeShards(shard);
4444-
}
4445-
44464372
public void testFlushOnIdleAfterOp() throws Exception {
44474373
// Holding the write lock makes the index/delete op to halt before being processed by the engine
44484374
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
@@ -5340,7 +5266,7 @@ public void prepareForEngineReset() throws IOException {
53405266
closeShards(shard);
53415267
}
53425268

5343-
public void testWithEngineOrNullIfBeingReset() throws Exception {
5269+
public void testTryWithEngineOrNull() throws Exception {
53445270
final var preparedForReset = new AtomicBoolean();
53455271
final var shard = newStartedShard(true, Settings.EMPTY, config -> {
53465272
if (preparedForReset.get()) {
@@ -5379,7 +5305,7 @@ public void prepareForEngineReset() throws IOException {
53795305

53805306
assertBusy(() -> assertThat(engineResetLock.isWriteLocked(), equalTo(true)));
53815307

5382-
shard.withEngineOrNullIfBeingReset(engine -> {
5308+
shard.tryWithEngineOrNull(engine -> {
53835309
assertNull(engine);
53845310
assertThat(engineResetLock.isReadLocked(), equalTo(false));
53855311
return null;
@@ -5389,7 +5315,7 @@ public void prepareForEngineReset() throws IOException {
53895315
safeGet(reset);
53905316
assertThat(engineResetLock.isWriteLocked(), equalTo(false));
53915317

5392-
shard.withEngineOrNullIfBeingReset(engine -> {
5318+
shard.tryWithEngineOrNull(engine -> {
53935319
assertThat(engine, instanceOf(ReadOnlyEngine.class));
53945320
assertThat(engineResetLock.isReadLocked(), equalTo(true));
53955321
return null;

0 commit comments

Comments
 (0)