Skip to content

Commit d9bcf62

Browse files
committed
Draft ES-10826
1 parent c211040 commit d9bcf62

File tree

3 files changed

+136
-28
lines changed

3 files changed

+136
-28
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.AbstractRefCounted;
13+
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.core.Releasable;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
public final class EngineRef extends AbstractRefCounted implements Releasable {
18+
19+
private final Releasable releasable;
20+
private final Engine engine;
21+
22+
public EngineRef(Engine engine, Releasable releasable) {
23+
this.engine = engine;
24+
this.releasable = releasable;
25+
}
26+
27+
@Nullable
28+
public Engine getEngineOrNull() {
29+
return engine;
30+
}
31+
32+
@Override
33+
protected void closeInternal() {
34+
releasable.close();
35+
}
36+
37+
@Override
38+
public void close() {
39+
decRef();
40+
}
41+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.Nullable;
13+
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
import java.util.function.Supplier;
19+
20+
class EngineReferenceManager {
21+
22+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // fair
23+
private final Releasable releaseExclusiveLock = () -> lock.writeLock().unlock(); // reuse this to avoid allocation for each op
24+
private final Releasable releaseLock = () -> lock.readLock().unlock(); // reuse this to avoid allocation for each op
25+
26+
private volatile Engine current;
27+
28+
private Releasable acquireLock() {
29+
lock.readLock().lock();
30+
return Releasables.assertOnce(releaseLock);
31+
}
32+
33+
public Releasable acquireExclusiveLock() {
34+
lock.writeLock().lock();
35+
return Releasables.assertOnce(releaseExclusiveLock);
36+
}
37+
38+
public boolean isExclusiveLockHeldByCurrentThread() {
39+
return lock.writeLock().isHeldByCurrentThread();
40+
}
41+
42+
public EngineRef getEngineRef() {
43+
try (var ignored = acquireLock()) {
44+
return new EngineRef(this.current, acquireLock());
45+
}
46+
}
47+
48+
@Nullable
49+
public Engine getAndSet(Supplier<Engine> supplier) {
50+
assert supplier != null : "supplier cannot be null";
51+
assert isExclusiveLockHeldByCurrentThread();
52+
Engine previous = this.current;
53+
this.current = supplier.get();
54+
return previous;
55+
}
56+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240240
// ensure happens-before relation between addRefreshListener() and postRecovery()
241241
private volatile SubscribableListener<Void> postRecoveryComplete;
242242
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
243-
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
244-
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
243+
244+
private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager();
245245
final EngineFactory engineFactory;
246246

247247
private final IndexingOperationListener indexingOperationListeners;
@@ -1613,10 +1613,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131613
Engine.IndexCommitRef indexCommit = null;
16141614
store.incRef();
16151615
try {
1616-
synchronized (engineMutex) {
1616+
try (var engineRef = engineReferenceManager.getEngineRef()) {
16171617
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
16181618
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1619-
final Engine engine = getEngineOrNull();
1619+
final Engine engine = engineRef.getEngineOrNull();
16201620
if (engine != null) {
16211621
indexCommit = engine.acquireLastIndexCommit(false);
16221622
}
@@ -1776,14 +1776,14 @@ public CacheHelper getReaderCacheHelper() {
17761776
}
17771777

17781778
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
1779-
synchronized (engineMutex) {
1779+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
17801780
try {
17811781
synchronized (mutex) {
17821782
changeState(IndexShardState.CLOSED, reason);
17831783
}
17841784
checkAndCallWaitForEngineOrClosedShardListeners();
17851785
} finally {
1786-
final Engine engine = this.currentEngineReference.getAndSet(null);
1786+
final Engine engine = engineReferenceManager.getAndSet(() -> null);
17871787
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
17881788
@Override
17891789
public void run() throws Exception {
@@ -1857,7 +1857,7 @@ public void prepareForIndexRecovery() {
18571857
throw new IndexShardNotRecoveringException(shardId, state);
18581858
}
18591859
recoveryState.setStage(RecoveryState.Stage.INDEX);
1860-
assert currentEngineReference.get() == null;
1860+
assert assertEngineReferenceIsNull("prepare for recovery, engine should be null");
18611861
}
18621862

18631863
/**
@@ -1936,8 +1936,8 @@ private void doLocalRecovery(
19361936
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371937
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
19381938
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
1939-
synchronized (engineMutex) {
1940-
IOUtils.close(currentEngineReference.getAndSet(null));
1939+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
1940+
IOUtils.close(engineReferenceManager.getAndSet(() -> null));
19411941
}
19421942
}, (recoveryCompleteListener, ignoredRef) -> {
19431943
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
@@ -2167,13 +2167,14 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672167
+ recoveryState.getRecoverySource()
21682168
+ "] but got "
21692169
+ getRetentionLeases();
2170-
synchronized (engineMutex) {
2171-
assert currentEngineReference.get() == null : "engine is running";
2170+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
2171+
assert assertEngineReferenceIsNull("engine is running");
21722172
verifyNotClosed();
2173-
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2173+
// we must create a new engine under lock (see IndexShard#snapshotStoreMetadata).
21742174
final Engine newEngine = createEngine(config);
21752175
onNewEngine(newEngine);
2176-
currentEngineReference.set(newEngine);
2176+
var previous = engineReferenceManager.getAndSet(() -> newEngine);
2177+
assert previous == null;
21772178
// We set active because we are now writing operations to the engine; this way,
21782179
// we can flush if we go idle after some time and become inactive.
21792180
active.set(true);
@@ -2241,7 +2242,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412242
}
22422243

22432244
private void onNewEngine(Engine newEngine) {
2244-
assert Thread.holdsLock(engineMutex);
2245+
assert engineReferenceManager.isExclusiveLockHeldByCurrentThread();
22452246
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
22462247
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
22472248
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
@@ -2252,9 +2253,9 @@ private void onNewEngine(Engine newEngine) {
22522253
*/
22532254
public void performRecoveryRestart() throws IOException {
22542255
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
2255-
synchronized (engineMutex) {
2256+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
22562257
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
2257-
IOUtils.close(currentEngineReference.getAndSet(null));
2258+
IOUtils.close(engineReferenceManager.getAndSet(() -> null));
22582259
resetRecoveryStage();
22592260
}
22602261
}
@@ -2264,7 +2265,7 @@ public void performRecoveryRestart() throws IOException {
22642265
*/
22652266
public void resetRecoveryStage() {
22662267
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
2267-
assert currentEngineReference.get() == null;
2268+
assert assertEngineReferenceIsNull("reset recovery stage, engine should be null");
22682269
if (state != IndexShardState.RECOVERING) {
22692270
throw new IndexShardNotRecoveringException(shardId, state);
22702271
}
@@ -3297,8 +3298,11 @@ Engine getEngine() {
32973298
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
32983299
* closed.
32993300
*/
3301+
@Deprecated
33003302
public Engine getEngineOrNull() {
3301-
return this.currentEngineReference.get();
3303+
try (var engineRef = engineReferenceManager.getEngineRef()) {
3304+
return engineRef.getEngineOrNull();
3305+
}
33023306
}
33033307

33043308
public void startRecovery(
@@ -4312,11 +4316,11 @@ public void resetEngine() {
43124316
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
43134317
assert waitForEngineOrClosedShardListeners.isDone();
43144318
try {
4315-
synchronized (engineMutex) {
4319+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
43164320
verifyNotClosed();
43174321
getEngine().prepareForEngineReset();
43184322
var newEngine = createEngine(newEngineConfig(replicationTracker));
4319-
IOUtils.close(currentEngineReference.getAndSet(newEngine));
4323+
IOUtils.close(engineReferenceManager.getAndSet(() -> newEngine));
43204324
onNewEngine(newEngine);
43214325
}
43224326
onSettingsChanged();
@@ -4342,7 +4346,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424346
SetOnce<Engine> newEngineReference = new SetOnce<>();
43434347
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
43444348
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
4345-
synchronized (engineMutex) {
4349+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
43464350
verifyNotClosed();
43474351
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484352
// acquireXXXCommit and close works.
@@ -4357,7 +4361,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574361
) {
43584362
@Override
43594363
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
4360-
synchronized (engineMutex) {
4364+
try (var ignored = engineReferenceManager.getEngineRef()) {
43614365
if (newEngineReference.get() == null) {
43624366
throw new AlreadyClosedException("engine was closed");
43634367
}
@@ -4368,7 +4372,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
43684372

43694373
@Override
43704374
public IndexCommitRef acquireSafeIndexCommit() {
4371-
synchronized (engineMutex) {
4375+
try (var ignored = engineReferenceManager.getEngineRef()) {
43724376
if (newEngineReference.get() == null) {
43734377
throw new AlreadyClosedException("engine was closed");
43744378
}
@@ -4379,17 +4383,17 @@ public IndexCommitRef acquireSafeIndexCommit() {
43794383
@Override
43804384
public void close() throws IOException {
43814385
Engine newEngine;
4382-
synchronized (engineMutex) {
4386+
try (var engineRef = engineReferenceManager.getEngineRef()) {
43834387
newEngine = newEngineReference.get();
4384-
if (newEngine == currentEngineReference.get()) {
4388+
if (newEngine == engineRef.getEngineOrNull()) {
43854389
// we successfully installed the new engine so do not close it.
43864390
newEngine = null;
43874391
}
43884392
}
43894393
IOUtils.close(super::close, newEngine);
43904394
}
43914395
};
4392-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
4396+
IOUtils.close(engineReferenceManager.getAndSet(() -> readOnlyEngine));
43934397
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43944398
onNewEngine(newEngineReference.get());
43954399
}
@@ -4403,9 +4407,9 @@ public void close() throws IOException {
44034407
);
44044408
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
44054409
newEngineReference.get().refresh("reset_engine");
4406-
synchronized (engineMutex) {
4410+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
44074411
verifyNotClosed();
4408-
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
4412+
IOUtils.close(engineReferenceManager.getAndSet(() -> newEngineReference.get()));
44094413
// We set active because we are now writing operations to the engine; this way,
44104414
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114415
active.set(true);
@@ -4516,4 +4520,11 @@ public void ensureMutable(ActionListener<Void> listener) {
45164520
l.onResponse(null);
45174521
}));
45184522
}
4523+
4524+
private boolean assertEngineReferenceIsNull(String message) {
4525+
try (var engineRef = engineReferenceManager.getEngineRef()) {
4526+
assert engineRef.getEngineOrNull() == null : message;
4527+
return true;
4528+
}
4529+
}
45194530
}

0 commit comments

Comments
 (0)