Skip to content

Commit 23ccb40

Browse files
committed
draft ES-10826
1 parent 1c368c7 commit 23ccb40

File tree

3 files changed

+159
-31
lines changed

3 files changed

+159
-31
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.apache.lucene.store.AlreadyClosedException;
13+
import org.elasticsearch.core.AbstractRefCounted;
14+
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.core.Releasable;
16+
import org.elasticsearch.index.engine.Engine;
17+
18+
public final class EngineRef extends AbstractRefCounted implements Releasable {
19+
20+
private final Releasable releasable;
21+
private final Engine engine;
22+
23+
public EngineRef(Engine engine, Releasable releasable) {
24+
this.engine = engine;
25+
this.releasable = releasable;
26+
}
27+
28+
@Nullable
29+
public Engine getEngineOrNull() {
30+
return engine;
31+
}
32+
33+
public Engine getEngine() {
34+
var engine = getEngineOrNull();
35+
if (engine == null) {
36+
throw new AlreadyClosedException("engine is closed");
37+
}
38+
return engine;
39+
}
40+
41+
@Override
42+
protected void closeInternal() {
43+
releasable.close();
44+
}
45+
46+
@Override
47+
public void close() {
48+
decRef();
49+
}
50+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.Nullable;
13+
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
import java.util.function.Supplier;
19+
20+
class EngineReferenceManager {
21+
22+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // fair
23+
private final Releasable releaseExclusiveLock = () -> lock.writeLock().unlock(); // reuse this to avoid allocation for each op
24+
private final Releasable releaseLock = () -> lock.readLock().unlock(); // reuse this to avoid allocation for each op
25+
26+
private volatile Engine current;
27+
28+
/**
29+
* @return a releasable reference to a given {@link Engine}, preventing it to be changed until the reference is released. Note that the
30+
* {@link Engine} referenced by the {@link EngineRef} may be null and may be closed at anytime in case of an engine failure, but is
31+
* guaranteed to not be changed (and closed) by an engine reset.
32+
*/
33+
public EngineRef getEngineRef() {
34+
lock.readLock().lock();
35+
return new EngineRef(this.current, Releasables.assertOnce(releaseLock));
36+
}
37+
38+
/**
39+
* Acquires a lock that prevents the {@link Engine} to be changed until the returned releasable is released.
40+
* @return
41+
*/
42+
Releasable acquireEngineLock() {
43+
lock.writeLock().lock();
44+
return Releasables.assertOnce(releaseExclusiveLock);
45+
}
46+
47+
boolean isEngineLockHeldByCurrentThread() {
48+
return lock.writeLock().isHeldByCurrentThread();
49+
}
50+
51+
/**
52+
* @return the (possibly null) current reference to the {@link Engine}
53+
*/
54+
@Nullable
55+
Engine get() {
56+
return this.current;
57+
}
58+
59+
@Nullable
60+
Engine getEngineAndSet(Supplier<Engine> supplier) {
61+
assert supplier != null : "supplier cannot be null";
62+
assert isEngineLockHeldByCurrentThread();
63+
Engine previous = this.current;
64+
this.current = supplier.get();
65+
return previous;
66+
}
67+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240240
// ensure happens-before relation between addRefreshListener() and postRecovery()
241241
private volatile SubscribableListener<Void> postRecoveryComplete;
242242
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
243-
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
244-
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
243+
244+
private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager();
245245
final EngineFactory engineFactory;
246246

247247
private final IndexingOperationListener indexingOperationListeners;
@@ -1277,10 +1277,13 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function
12771277
if (indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) {
12781278
throw new IllegalStateException("get operations not allowed on a legacy index");
12791279
}
1280-
if (translogOnly) {
1281-
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
1280+
try (var engineRef = engineReferenceManager.getEngineRef()) {
1281+
var engine = engineRef.getEngine();
1282+
if (translogOnly) {
1283+
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
1284+
}
1285+
return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
12821286
}
1283-
return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
12841287
}
12851288

12861289
/**
@@ -1613,10 +1616,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131616
Engine.IndexCommitRef indexCommit = null;
16141617
store.incRef();
16151618
try {
1616-
synchronized (engineMutex) {
1619+
try (var engineRef = engineReferenceManager.getEngineRef()) {
16171620
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
16181621
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1619-
final Engine engine = getEngineOrNull();
1622+
final Engine engine = engineRef.getEngineOrNull();
16201623
if (engine != null) {
16211624
indexCommit = engine.acquireLastIndexCommit(false);
16221625
}
@@ -1776,14 +1779,14 @@ public CacheHelper getReaderCacheHelper() {
17761779
}
17771780

17781781
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
1779-
synchronized (engineMutex) {
1782+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
17801783
try {
17811784
synchronized (mutex) {
17821785
changeState(IndexShardState.CLOSED, reason);
17831786
}
17841787
checkAndCallWaitForEngineOrClosedShardListeners();
17851788
} finally {
1786-
final Engine engine = this.currentEngineReference.getAndSet(null);
1789+
final Engine engine = engineReferenceManager.getEngineAndSet(() -> null);
17871790
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
17881791
@Override
17891792
public void run() throws Exception {
@@ -1857,7 +1860,7 @@ public void prepareForIndexRecovery() {
18571860
throw new IndexShardNotRecoveringException(shardId, state);
18581861
}
18591862
recoveryState.setStage(RecoveryState.Stage.INDEX);
1860-
assert currentEngineReference.get() == null;
1863+
assert assertEngineReferenceIsNull("prepare for recovery, engine should be null");
18611864
}
18621865

18631866
/**
@@ -1936,8 +1939,8 @@ private void doLocalRecovery(
19361939
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371940
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
19381941
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
1939-
synchronized (engineMutex) {
1940-
IOUtils.close(currentEngineReference.getAndSet(null));
1942+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
1943+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null));
19411944
}
19421945
}, (recoveryCompleteListener, ignoredRef) -> {
19431946
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
@@ -2167,13 +2170,14 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672170
+ recoveryState.getRecoverySource()
21682171
+ "] but got "
21692172
+ getRetentionLeases();
2170-
synchronized (engineMutex) {
2171-
assert currentEngineReference.get() == null : "engine is running";
2173+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
2174+
assert assertEngineReferenceIsNull("engine is running");
21722175
verifyNotClosed();
2173-
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2176+
// we must create a new engine under lock (see IndexShard#snapshotStoreMetadata).
21742177
final Engine newEngine = createEngine(config);
21752178
onNewEngine(newEngine);
2176-
currentEngineReference.set(newEngine);
2179+
var previous = engineReferenceManager.getEngineAndSet(() -> newEngine);
2180+
assert previous == null;
21772181
// We set active because we are now writing operations to the engine; this way,
21782182
// we can flush if we go idle after some time and become inactive.
21792183
active.set(true);
@@ -2241,7 +2245,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412245
}
22422246

22432247
private void onNewEngine(Engine newEngine) {
2244-
assert Thread.holdsLock(engineMutex);
2248+
assert engineReferenceManager.isEngineLockHeldByCurrentThread();
22452249
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
22462250
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
22472251
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
@@ -2252,9 +2256,9 @@ private void onNewEngine(Engine newEngine) {
22522256
*/
22532257
public void performRecoveryRestart() throws IOException {
22542258
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
2255-
synchronized (engineMutex) {
2259+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
22562260
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
2257-
IOUtils.close(currentEngineReference.getAndSet(null));
2261+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null));
22582262
resetRecoveryStage();
22592263
}
22602264
}
@@ -2264,7 +2268,7 @@ public void performRecoveryRestart() throws IOException {
22642268
*/
22652269
public void resetRecoveryStage() {
22662270
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
2267-
assert currentEngineReference.get() == null;
2271+
assert assertEngineReferenceIsNull("reset recovery stage, engine should be null");
22682272
if (state != IndexShardState.RECOVERING) {
22692273
throw new IndexShardNotRecoveringException(shardId, state);
22702274
}
@@ -3298,7 +3302,7 @@ Engine getEngine() {
32983302
* closed.
32993303
*/
33003304
public Engine getEngineOrNull() {
3301-
return this.currentEngineReference.get();
3305+
return engineReferenceManager.get();
33023306
}
33033307

33043308
public void startRecovery(
@@ -4312,11 +4316,11 @@ public void resetEngine() {
43124316
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
43134317
assert waitForEngineOrClosedShardListeners.isDone();
43144318
try {
4315-
synchronized (engineMutex) {
4319+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
43164320
verifyNotClosed();
43174321
getEngine().prepareForEngineReset();
43184322
var newEngine = createEngine(newEngineConfig(replicationTracker));
4319-
IOUtils.close(currentEngineReference.getAndSet(newEngine));
4323+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> newEngine));
43204324
onNewEngine(newEngine);
43214325
}
43224326
onSettingsChanged();
@@ -4342,7 +4346,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424346
SetOnce<Engine> newEngineReference = new SetOnce<>();
43434347
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
43444348
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
4345-
synchronized (engineMutex) {
4349+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
43464350
verifyNotClosed();
43474351
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484352
// acquireXXXCommit and close works.
@@ -4357,7 +4361,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574361
) {
43584362
@Override
43594363
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
4360-
synchronized (engineMutex) {
4364+
try (var ignored = engineReferenceManager.getEngineRef()) {
43614365
if (newEngineReference.get() == null) {
43624366
throw new AlreadyClosedException("engine was closed");
43634367
}
@@ -4368,7 +4372,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
43684372

43694373
@Override
43704374
public IndexCommitRef acquireSafeIndexCommit() {
4371-
synchronized (engineMutex) {
4375+
try (var ignored = engineReferenceManager.getEngineRef()) {
43724376
if (newEngineReference.get() == null) {
43734377
throw new AlreadyClosedException("engine was closed");
43744378
}
@@ -4379,17 +4383,17 @@ public IndexCommitRef acquireSafeIndexCommit() {
43794383
@Override
43804384
public void close() throws IOException {
43814385
Engine newEngine;
4382-
synchronized (engineMutex) {
4386+
try (var engineRef = engineReferenceManager.getEngineRef()) {
43834387
newEngine = newEngineReference.get();
4384-
if (newEngine == currentEngineReference.get()) {
4388+
if (newEngine == engineRef.getEngineOrNull()) {
43854389
// we successfully installed the new engine so do not close it.
43864390
newEngine = null;
43874391
}
43884392
}
43894393
IOUtils.close(super::close, newEngine);
43904394
}
43914395
};
4392-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
4396+
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> readOnlyEngine));
43934397
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43944398
onNewEngine(newEngineReference.get());
43954399
}
@@ -4403,9 +4407,9 @@ public void close() throws IOException {
44034407
);
44044408
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
44054409
newEngineReference.get().refresh("reset_engine");
4406-
synchronized (engineMutex) {
4410+
try (var ignored = engineReferenceManager.acquireEngineLock()) {
44074411
verifyNotClosed();
4408-
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
4412+
IOUtils.close(engineReferenceManager.getEngineAndSet(newEngineReference::get));
44094413
// We set active because we are now writing operations to the engine; this way,
44104414
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114415
active.set(true);
@@ -4516,4 +4520,11 @@ public void ensureMutable(ActionListener<Void> listener) {
45164520
l.onResponse(null);
45174521
}));
45184522
}
4523+
4524+
private boolean assertEngineReferenceIsNull(String message) {
4525+
// use accessor with no lock as this asserting method can be called anywhere,
4526+
// including under the refresh lock of the index reader.
4527+
assert engineReferenceManager.get() == null : message;
4528+
return true;
4529+
}
45194530
}

0 commit comments

Comments
 (0)