Skip to content

Commit 9a357d1

Browse files
committed
Draft ES-10826
1 parent 19fe0a4 commit 9a357d1

File tree

3 files changed

+132
-28
lines changed

3 files changed

+132
-28
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.AbstractRefCounted;
13+
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.core.Releasable;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
public final class EngineRef extends AbstractRefCounted implements Releasable {
18+
19+
private final Releasable releasable;
20+
private final Engine engine;
21+
22+
public EngineRef(Engine engine, Releasable releasable) {
23+
this.engine = engine;
24+
this.releasable = releasable;
25+
}
26+
27+
@Nullable
28+
public Engine getEngineOrNull() {
29+
return engine;
30+
}
31+
32+
@Override
33+
protected void closeInternal() {
34+
releasable.close();
35+
}
36+
37+
@Override
38+
public void close() {
39+
decRef();
40+
}
41+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.Nullable;
13+
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
import java.util.function.Supplier;
19+
20+
class EngineReferenceManager {
21+
22+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // fair
23+
private final Releasable releaseExclusiveLock = () -> lock.writeLock().unlock(); // reuse this to avoid allocation for each op
24+
private final Releasable releaseLock = () -> lock.readLock().unlock(); // reuse this to avoid allocation for each op
25+
26+
private volatile Engine current;
27+
28+
public EngineRef getEngineRef() {
29+
lock.readLock().lock();
30+
return new EngineRef(this.current, Releasables.assertOnce(releaseLock));
31+
}
32+
33+
@Nullable
34+
Engine getEngine() {
35+
return this.current;
36+
}
37+
38+
Releasable acquireEngineLock() {
39+
lock.writeLock().lock();
40+
return Releasables.assertOnce(releaseExclusiveLock);
41+
}
42+
43+
boolean isEngineLockHeldByCurrentThread() {
44+
return lock.writeLock().isHeldByCurrentThread();
45+
}
46+
47+
@Nullable
48+
Engine getEngineAndSet(Supplier<Engine> supplier) {
49+
assert supplier != null : "supplier cannot be null";
50+
assert isEngineLockHeldByCurrentThread();
51+
Engine previous = this.current;
52+
this.current = supplier.get();
53+
return previous;
54+
}
55+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240240
// ensure happens-before relation between addRefreshListener() and postRecovery()
241241
private volatile SubscribableListener<Void> postRecoveryComplete;
242242
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
243-
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
244-
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
243+
244+
private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager();
245245
final EngineFactory engineFactory;
246246

247247
private final IndexingOperationListener indexingOperationListeners;
@@ -1613,10 +1613,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131613
Engine.IndexCommitRef indexCommit = null;
16141614
store.incRef();
16151615
try {
1616-
synchronized (engineMutex) {
1616+
try (var engineRef = engineReferenceManager.getEngineRef()) {
16171617
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
16181618
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1619-
final Engine engine = getEngineOrNull();
1619+
final Engine engine = engineRef.getEngineOrNull();
16201620
if (engine != null) {
16211621
indexCommit = engine.acquireLastIndexCommit(false);
16221622
}
@@ -1776,14 +1776,14 @@ public CacheHelper getReaderCacheHelper() {
17761776
}
17771777

17781778
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
1779-
synchronized (engineMutex) {
1779+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
17801780
try {
17811781
synchronized (mutex) {
17821782
changeState(IndexShardState.CLOSED, reason);
17831783
}
17841784
checkAndCallWaitForEngineOrClosedShardListeners();
17851785
} finally {
1786-
final Engine engine = this.currentEngineReference.getAndSet(null);
1786+
final Engine engine = engineReferenceManager.getEngineAndSet(() -> null);
17871787
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
17881788
@Override
17891789
public void run() throws Exception {
@@ -1857,7 +1857,7 @@ public void prepareForIndexRecovery() {
18571857
throw new IndexShardNotRecoveringException(shardId, state);
18581858
}
18591859
recoveryState.setStage(RecoveryState.Stage.INDEX);
1860-
assert currentEngineReference.get() == null;
1860+
assert assertEngineReferenceIsNull("prepare for recovery, engine should be null");
18611861
}
18621862

18631863
/**
@@ -1936,8 +1936,8 @@ private void doLocalRecovery(
19361936
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371937
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
19381938
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
1939-
synchronized (engineMutex) {
1940-
IOUtils.close(currentEngineReference.getAndSet(null));
1939+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
1940+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null));
19411941
}
19421942
}, (recoveryCompleteListener, ignoredRef) -> {
19431943
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
@@ -2167,13 +2167,14 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672167
+ recoveryState.getRecoverySource()
21682168
+ "] but got "
21692169
+ getRetentionLeases();
2170-
synchronized (engineMutex) {
2171-
assert currentEngineReference.get() == null : "engine is running";
2170+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
2171+
assert assertEngineReferenceIsNull("engine is running");
21722172
verifyNotClosed();
2173-
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2173+
// we must create a new engine under lock (see IndexShard#snapshotStoreMetadata).
21742174
final Engine newEngine = createEngine(config);
21752175
onNewEngine(newEngine);
2176-
currentEngineReference.set(newEngine);
2176+
var previous = engineReferenceManager.getEngineAndSet(() -> newEngine);
2177+
assert previous == null;
21772178
// We set active because we are now writing operations to the engine; this way,
21782179
// we can flush if we go idle after some time and become inactive.
21792180
active.set(true);
@@ -2241,7 +2242,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412242
}
22422243

22432244
private void onNewEngine(Engine newEngine) {
2244-
assert Thread.holdsLock(engineMutex);
2245+
assert engineReferenceManager.isEngineLockHeldByCurrentThread();
22452246
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
22462247
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
22472248
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
@@ -2252,9 +2253,9 @@ private void onNewEngine(Engine newEngine) {
22522253
*/
22532254
public void performRecoveryRestart() throws IOException {
22542255
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
2255-
synchronized (engineMutex) {
2256+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
22562257
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
2257-
IOUtils.close(currentEngineReference.getAndSet(null));
2258+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null));
22582259
resetRecoveryStage();
22592260
}
22602261
}
@@ -2264,7 +2265,7 @@ public void performRecoveryRestart() throws IOException {
22642265
*/
22652266
public void resetRecoveryStage() {
22662267
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
2267-
assert currentEngineReference.get() == null;
2268+
assert assertEngineReferenceIsNull("reset recovery stage, engine should be null");
22682269
if (state != IndexShardState.RECOVERING) {
22692270
throw new IndexShardNotRecoveringException(shardId, state);
22702271
}
@@ -3298,7 +3299,7 @@ Engine getEngine() {
32983299
* closed.
32993300
*/
33003301
public Engine getEngineOrNull() {
3301-
return this.currentEngineReference.get();
3302+
return engineReferenceManager.getEngine();
33023303
}
33033304

33043305
public void startRecovery(
@@ -4312,11 +4313,11 @@ public void resetEngine() {
43124313
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
43134314
assert waitForEngineOrClosedShardListeners.isDone();
43144315
try {
4315-
synchronized (engineMutex) {
4316+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
43164317
verifyNotClosed();
43174318
getEngine().prepareForEngineReset();
43184319
var newEngine = createEngine(newEngineConfig(replicationTracker));
4319-
IOUtils.close(currentEngineReference.getAndSet(newEngine));
4320+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> newEngine));
43204321
onNewEngine(newEngine);
43214322
}
43224323
onSettingsChanged();
@@ -4342,7 +4343,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424343
SetOnce<Engine> newEngineReference = new SetOnce<>();
43434344
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
43444345
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
4345-
synchronized (engineMutex) {
4346+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
43464347
verifyNotClosed();
43474348
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484349
// acquireXXXCommit and close works.
@@ -4357,7 +4358,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574358
) {
43584359
@Override
43594360
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
4360-
synchronized (engineMutex) {
4361+
try (var ignored = engineReferenceManager.getEngineRef()) {
43614362
if (newEngineReference.get() == null) {
43624363
throw new AlreadyClosedException("engine was closed");
43634364
}
@@ -4368,7 +4369,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
43684369

43694370
@Override
43704371
public IndexCommitRef acquireSafeIndexCommit() {
4371-
synchronized (engineMutex) {
4372+
try (var ignored = engineReferenceManager.getEngineRef()) {
43724373
if (newEngineReference.get() == null) {
43734374
throw new AlreadyClosedException("engine was closed");
43744375
}
@@ -4379,17 +4380,17 @@ public IndexCommitRef acquireSafeIndexCommit() {
43794380
@Override
43804381
public void close() throws IOException {
43814382
Engine newEngine;
4382-
synchronized (engineMutex) {
4383+
try (var engineRef = engineReferenceManager.getEngineRef()) {
43834384
newEngine = newEngineReference.get();
4384-
if (newEngine == currentEngineReference.get()) {
4385+
if (newEngine == engineRef.getEngineOrNull()) {
43854386
// we successfully installed the new engine so do not close it.
43864387
newEngine = null;
43874388
}
43884389
}
43894390
IOUtils.close(super::close, newEngine);
43904391
}
43914392
};
4392-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
4393+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> readOnlyEngine));
43934394
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43944395
onNewEngine(newEngineReference.get());
43954396
}
@@ -4403,9 +4404,9 @@ public void close() throws IOException {
44034404
);
44044405
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
44054406
newEngineReference.get().refresh("reset_engine");
4406-
synchronized (engineMutex) {
4407+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
44074408
verifyNotClosed();
4408-
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
4409+
IOUtils.close(engineReferenceManager.getEngineAndSet(newEngineReference::get));
44094410
// We set active because we are now writing operations to the engine; this way,
44104411
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114412
active.set(true);
@@ -4516,4 +4517,11 @@ public void ensureMutable(ActionListener<Void> listener) {
45164517
l.onResponse(null);
45174518
}));
45184519
}
4520+
4521+
private boolean assertEngineReferenceIsNull(String message) {
4522+
// use accessor with no lock as this asserting method can be called anywhere,
4523+
// including under the refresh lock of the index reader.
4524+
assert engineReferenceManager.getEngine() == null : message;
4525+
return true;
4526+
}
45194527
}

0 commit comments

Comments
 (0)