Skip to content

Commit 49ab337

Browse files
committed
Draft ES-10826
1 parent c211040 commit 49ab337

File tree

3 files changed

+151
-31
lines changed

3 files changed

+151
-31
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.AbstractRefCounted;
13+
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.core.Releasable;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
public final class EngineRef extends AbstractRefCounted implements Releasable {
18+
19+
private final Releasable releasable;
20+
private final Engine engine;
21+
22+
public EngineRef(Engine engine, Releasable releasable) {
23+
this.engine = engine;
24+
this.releasable = releasable;
25+
}
26+
27+
@Nullable
28+
public Engine getEngineOrNull() {
29+
return engine;
30+
}
31+
32+
@Override
33+
protected void closeInternal() {
34+
releasable.close();
35+
}
36+
37+
@Override
38+
public void close() {
39+
decRef();
40+
}
41+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.Nullable;
13+
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
import java.util.function.Supplier;
19+
20+
class EngineReferenceManager {
21+
22+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // fair
23+
private final Releasable releaseExclusiveLock = () -> lock.writeLock().unlock(); // reuse this to avoid allocation for each op
24+
private final Releasable releaseLock = () -> lock.readLock().unlock(); // reuse this to avoid allocation for each op
25+
26+
private volatile Engine current;
27+
28+
private Releasable acquireLock() {
29+
lock.readLock().lock();
30+
return Releasables.assertOnce(releaseLock);
31+
}
32+
33+
public Releasable acquireExclusiveLock() {
34+
lock.writeLock().lock();
35+
return Releasables.assertOnce(releaseExclusiveLock);
36+
}
37+
38+
public boolean isExclusiveLockHeldByCurrentThread() {
39+
return lock.writeLock().isHeldByCurrentThread();
40+
}
41+
42+
public EngineRef getEngineRef() {
43+
try (var ignored = acquireLock()) {
44+
return new EngineRef(this.current, acquireLock());
45+
}
46+
}
47+
48+
public Engine getEngineNoLock() {
49+
return this.current;
50+
}
51+
52+
@Nullable
53+
public Engine getAndSet(Supplier<Engine> supplier) {
54+
assert supplier != null : "supplier cannot be null";
55+
assert isExclusiveLockHeldByCurrentThread();
56+
Engine previous = this.current;
57+
this.current = supplier.get();
58+
return previous;
59+
}
60+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240240
// ensure happens-before relation between addRefreshListener() and postRecovery()
241241
private volatile SubscribableListener<Void> postRecoveryComplete;
242242
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
243-
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
244-
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
243+
244+
private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager();
245245
final EngineFactory engineFactory;
246246

247247
private final IndexingOperationListener indexingOperationListeners;
@@ -1613,10 +1613,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131613
Engine.IndexCommitRef indexCommit = null;
16141614
store.incRef();
16151615
try {
1616-
synchronized (engineMutex) {
1616+
try (var engineRef = engineReferenceManager.getEngineRef()) {
16171617
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
16181618
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1619-
final Engine engine = getEngineOrNull();
1619+
final Engine engine = engineRef.getEngineOrNull();
16201620
if (engine != null) {
16211621
indexCommit = engine.acquireLastIndexCommit(false);
16221622
}
@@ -1776,14 +1776,14 @@ public CacheHelper getReaderCacheHelper() {
17761776
}
17771777

17781778
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
1779-
synchronized (engineMutex) {
1779+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
17801780
try {
17811781
synchronized (mutex) {
17821782
changeState(IndexShardState.CLOSED, reason);
17831783
}
17841784
checkAndCallWaitForEngineOrClosedShardListeners();
17851785
} finally {
1786-
final Engine engine = this.currentEngineReference.getAndSet(null);
1786+
final Engine engine = engineReferenceManager.getAndSet(() -> null);
17871787
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
17881788
@Override
17891789
public void run() throws Exception {
@@ -1857,7 +1857,7 @@ public void prepareForIndexRecovery() {
18571857
throw new IndexShardNotRecoveringException(shardId, state);
18581858
}
18591859
recoveryState.setStage(RecoveryState.Stage.INDEX);
1860-
assert currentEngineReference.get() == null;
1860+
assert assertEngineReferenceIsNull("prepare for recovery, engine should be null");
18611861
}
18621862

18631863
/**
@@ -1936,8 +1936,8 @@ private void doLocalRecovery(
19361936
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371937
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
19381938
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
1939-
synchronized (engineMutex) {
1940-
IOUtils.close(currentEngineReference.getAndSet(null));
1939+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
1940+
IOUtils.close(engineReferenceManager.getAndSet(() -> null));
19411941
}
19421942
}, (recoveryCompleteListener, ignoredRef) -> {
19431943
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
@@ -2167,13 +2167,14 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672167
+ recoveryState.getRecoverySource()
21682168
+ "] but got "
21692169
+ getRetentionLeases();
2170-
synchronized (engineMutex) {
2171-
assert currentEngineReference.get() == null : "engine is running";
2170+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
2171+
assert assertEngineReferenceIsNull("engine is running");
21722172
verifyNotClosed();
2173-
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2173+
// we must create a new engine under lock (see IndexShard#snapshotStoreMetadata).
21742174
final Engine newEngine = createEngine(config);
21752175
onNewEngine(newEngine);
2176-
currentEngineReference.set(newEngine);
2176+
var previous = engineReferenceManager.getAndSet(() -> newEngine);
2177+
assert previous == null;
21772178
// We set active because we are now writing operations to the engine; this way,
21782179
// we can flush if we go idle after some time and become inactive.
21792180
active.set(true);
@@ -2241,7 +2242,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412242
}
22422243

22432244
private void onNewEngine(Engine newEngine) {
2244-
assert Thread.holdsLock(engineMutex);
2245+
assert engineReferenceManager.isExclusiveLockHeldByCurrentThread();
22452246
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
22462247
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
22472248
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
@@ -2252,9 +2253,9 @@ private void onNewEngine(Engine newEngine) {
22522253
*/
22532254
public void performRecoveryRestart() throws IOException {
22542255
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
2255-
synchronized (engineMutex) {
2256+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
22562257
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
2257-
IOUtils.close(currentEngineReference.getAndSet(null));
2258+
IOUtils.close(engineReferenceManager.getAndSet(() -> null));
22582259
resetRecoveryStage();
22592260
}
22602261
}
@@ -2264,7 +2265,7 @@ public void performRecoveryRestart() throws IOException {
22642265
*/
22652266
public void resetRecoveryStage() {
22662267
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
2267-
assert currentEngineReference.get() == null;
2268+
assert assertEngineReferenceIsNull("reset recovery stage, engine should be null");
22682269
if (state != IndexShardState.RECOVERING) {
22692270
throw new IndexShardNotRecoveringException(shardId, state);
22702271
}
@@ -3293,12 +3294,23 @@ Engine getEngine() {
32933294
return engine;
32943295
}
32953296

3297+
Engine getEngineNoLock() {
3298+
Engine engine = engineReferenceManager.getEngineNoLock();
3299+
if (engine == null) {
3300+
throw new AlreadyClosedException("engine is closed");
3301+
}
3302+
return engine;
3303+
}
3304+
32963305
/**
32973306
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
32983307
* closed.
32993308
*/
3309+
@Deprecated
33003310
public Engine getEngineOrNull() {
3301-
return this.currentEngineReference.get();
3311+
try (var engineRef = engineReferenceManager.getEngineRef()) {
3312+
return engineRef.getEngineOrNull();
3313+
}
33023314
}
33033315

33043316
public void startRecovery(
@@ -4088,7 +4100,7 @@ private class RefreshPendingLocationListener implements ReferenceManager.Refresh
40884100
@Override
40894101
public void beforeRefresh() {
40904102
try {
4091-
lastWriteLocation = getEngine().getTranslogLastWriteLocation();
4103+
lastWriteLocation = getEngineNoLock().getTranslogLastWriteLocation();
40924104
} catch (AlreadyClosedException exc) {
40934105
// shard is closed - no location is fine
40944106
lastWriteLocation = null;
@@ -4116,7 +4128,7 @@ public void beforeRefresh() {}
41164128
@Override
41174129
public void afterRefresh(boolean didRefresh) {
41184130
if (enableFieldHasValue && (didRefresh || fieldInfos == FieldInfos.EMPTY)) {
4119-
try (Engine.Searcher hasValueSearcher = getEngine().acquireSearcher("field_has_value")) {
4131+
try (Engine.Searcher hasValueSearcher = getEngineNoLock().acquireSearcher("field_has_value")) {
41204132
setFieldInfos(FieldInfos.getMergedFieldInfos(hasValueSearcher.getIndexReader()));
41214133
} catch (AlreadyClosedException ignore) {
41224134
// engine is closed - no updated FieldInfos is fine
@@ -4142,7 +4154,7 @@ public void beforeRefresh() {
41424154
@Override
41434155
public void afterRefresh(boolean didRefresh) {
41444156
if (shardFieldStats == null || didRefresh) {
4145-
try (var searcher = getEngine().acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL)) {
4157+
try (var searcher = getEngineNoLock().acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL)) {
41464158
int numSegments = 0;
41474159
int totalFields = 0;
41484160
long usages = 0;
@@ -4312,11 +4324,11 @@ public void resetEngine() {
43124324
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
43134325
assert waitForEngineOrClosedShardListeners.isDone();
43144326
try {
4315-
synchronized (engineMutex) {
4327+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
43164328
verifyNotClosed();
43174329
getEngine().prepareForEngineReset();
43184330
var newEngine = createEngine(newEngineConfig(replicationTracker));
4319-
IOUtils.close(currentEngineReference.getAndSet(newEngine));
4331+
IOUtils.close(engineReferenceManager.getAndSet(() -> newEngine));
43204332
onNewEngine(newEngine);
43214333
}
43224334
onSettingsChanged();
@@ -4342,7 +4354,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424354
SetOnce<Engine> newEngineReference = new SetOnce<>();
43434355
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
43444356
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
4345-
synchronized (engineMutex) {
4357+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
43464358
verifyNotClosed();
43474359
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484360
// acquireXXXCommit and close works.
@@ -4357,7 +4369,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574369
) {
43584370
@Override
43594371
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
4360-
synchronized (engineMutex) {
4372+
try (var ignored = engineReferenceManager.getEngineRef()) {
43614373
if (newEngineReference.get() == null) {
43624374
throw new AlreadyClosedException("engine was closed");
43634375
}
@@ -4368,7 +4380,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
43684380

43694381
@Override
43704382
public IndexCommitRef acquireSafeIndexCommit() {
4371-
synchronized (engineMutex) {
4383+
try (var ignored = engineReferenceManager.getEngineRef()) {
43724384
if (newEngineReference.get() == null) {
43734385
throw new AlreadyClosedException("engine was closed");
43744386
}
@@ -4379,17 +4391,17 @@ public IndexCommitRef acquireSafeIndexCommit() {
43794391
@Override
43804392
public void close() throws IOException {
43814393
Engine newEngine;
4382-
synchronized (engineMutex) {
4394+
try (var engineRef = engineReferenceManager.getEngineRef()) {
43834395
newEngine = newEngineReference.get();
4384-
if (newEngine == currentEngineReference.get()) {
4396+
if (newEngine == engineRef.getEngineOrNull()) {
43854397
// we successfully installed the new engine so do not close it.
43864398
newEngine = null;
43874399
}
43884400
}
43894401
IOUtils.close(super::close, newEngine);
43904402
}
43914403
};
4392-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
4404+
IOUtils.close(engineReferenceManager.getAndSet(() -> readOnlyEngine));
43934405
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43944406
onNewEngine(newEngineReference.get());
43954407
}
@@ -4403,9 +4415,9 @@ public void close() throws IOException {
44034415
);
44044416
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
44054417
newEngineReference.get().refresh("reset_engine");
4406-
synchronized (engineMutex) {
4418+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
44074419
verifyNotClosed();
4408-
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
4420+
IOUtils.close(engineReferenceManager.getAndSet(() -> newEngineReference.get()));
44094421
// We set active because we are now writing operations to the engine; this way,
44104422
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114423
active.set(true);
@@ -4516,4 +4528,11 @@ public void ensureMutable(ActionListener<Void> listener) {
45164528
l.onResponse(null);
45174529
}));
45184530
}
4531+
4532+
private boolean assertEngineReferenceIsNull(String message) {
4533+
try (var engineRef = engineReferenceManager.getEngineRef()) {
4534+
assert engineRef.getEngineOrNull() == null : message;
4535+
return true;
4536+
}
4537+
}
45194538
}

0 commit comments

Comments
 (0)