Skip to content

Commit 7f40f81

Browse files
committed
Draft ES-10826
1 parent fb75dc4 commit 7f40f81

File tree

3 files changed

+149
-32
lines changed

3 files changed

+149
-32
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.AbstractRefCounted;
13+
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.core.Releasable;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
public final class EngineRef extends AbstractRefCounted implements Releasable {
18+
19+
private final Releasable releasable;
20+
private final Engine engine;
21+
22+
public EngineRef(Engine engine, Releasable releasable) {
23+
this.engine = engine;
24+
this.releasable = releasable;
25+
}
26+
27+
@Nullable
28+
public Engine getEngineOrNull() {
29+
return engine;
30+
}
31+
32+
@Override
33+
protected void closeInternal() {
34+
releasable.close();
35+
}
36+
37+
@Override
38+
public void close() {
39+
decRef();
40+
}
41+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.shard;
11+
12+
import org.elasticsearch.core.Nullable;
13+
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.core.Releasables;
15+
import org.elasticsearch.index.engine.Engine;
16+
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
import java.util.function.Supplier;
19+
20+
class EngineReferenceManager {
21+
22+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // fair
23+
private final Releasable releaseExclusiveLock = () -> lock.writeLock().unlock(); // reuse this to avoid allocation for each op
24+
private final Releasable releaseLock = () -> lock.readLock().unlock(); // reuse this to avoid allocation for each op
25+
26+
private volatile Engine current;
27+
28+
private Releasable acquireLock() {
29+
lock.readLock().lock();
30+
return Releasables.assertOnce(releaseLock);
31+
}
32+
33+
public Releasable acquireExclusiveLock() {
34+
lock.writeLock().lock();
35+
return Releasables.assertOnce(releaseExclusiveLock);
36+
}
37+
38+
public boolean isExclusiveLockHeldByCurrentThread() {
39+
return lock.writeLock().isHeldByCurrentThread();
40+
}
41+
42+
public EngineRef getEngineRef() {
43+
try (var ignored = acquireLock()) {
44+
return new EngineRef(this.current, acquireLock());
45+
}
46+
}
47+
48+
@Nullable
49+
private Engine getAndSetExclusiveLock(Supplier<Engine> supplier) {
50+
Engine previous = null;
51+
try (var ignored = acquireExclusiveLock()) {
52+
previous = getAndSet(supplier);
53+
}
54+
return previous;
55+
}
56+
57+
@Nullable
58+
public Engine getAndSet(Supplier<Engine> supplier) {
59+
assert isExclusiveLockHeldByCurrentThread();
60+
Engine previous = this.current;
61+
this.current = supplier.get();
62+
return previous;
63+
}
64+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240240
// ensure happens-before relation between addRefreshListener() and postRecovery()
241241
private volatile SubscribableListener<Void> postRecoveryComplete;
242242
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
243-
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
244-
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
243+
244+
private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager();
245245
final EngineFactory engineFactory;
246246

247247
private final IndexingOperationListener indexingOperationListeners;
@@ -1613,10 +1613,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131613
Engine.IndexCommitRef indexCommit = null;
16141614
store.incRef();
16151615
try {
1616-
synchronized (engineMutex) {
1616+
try (var engineRef = engineReferenceManager.getEngineRef()) {
16171617
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
16181618
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1619-
final Engine engine = getEngineOrNull();
1619+
final Engine engine = engineRef.getEngineOrNull();
16201620
if (engine != null) {
16211621
indexCommit = engine.acquireLastIndexCommit(false);
16221622
}
@@ -1776,14 +1776,14 @@ public CacheHelper getReaderCacheHelper() {
17761776
}
17771777

17781778
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
1779-
synchronized (engineMutex) {
1779+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
17801780
try {
17811781
synchronized (mutex) {
17821782
changeState(IndexShardState.CLOSED, reason);
17831783
}
17841784
checkAndCallWaitForEngineOrClosedShardListeners();
17851785
} finally {
1786-
final Engine engine = this.currentEngineReference.getAndSet(null);
1786+
final Engine engine = engineReferenceManager.getAndSet(() -> null);
17871787
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
17881788
@Override
17891789
public void run() throws Exception {
@@ -1857,7 +1857,7 @@ public void prepareForIndexRecovery() {
18571857
throw new IndexShardNotRecoveringException(shardId, state);
18581858
}
18591859
recoveryState.setStage(RecoveryState.Stage.INDEX);
1860-
assert currentEngineReference.get() == null;
1860+
assert assertEngineReferenceIsNull("prepare for recovery, engine should be null");
18611861
}
18621862

18631863
/**
@@ -1936,8 +1936,8 @@ private void doLocalRecovery(
19361936
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371937
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
19381938
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
1939-
synchronized (engineMutex) {
1940-
IOUtils.close(currentEngineReference.getAndSet(null));
1939+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
1940+
IOUtils.close(engineReferenceManager.getAndSet(null));
19411941
}
19421942
}, (recoveryCompleteListener, ignoredRef) -> {
19431943
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
@@ -2167,13 +2167,13 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672167
+ recoveryState.getRecoverySource()
21682168
+ "] but got "
21692169
+ getRetentionLeases();
2170-
synchronized (engineMutex) {
2171-
assert currentEngineReference.get() == null : "engine is running";
2170+
// we must create a new engine under lock (see IndexShard#snapshotStoreMetadata).
2171+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
2172+
assert assertEngineReferenceIsNull("engine is running");
21722173
verifyNotClosed();
2173-
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
21742174
final Engine newEngine = createEngine(config);
21752175
onNewEngine(newEngine);
2176-
currentEngineReference.set(newEngine);
2176+
engineReferenceManager.getAndSet(() -> newEngine);
21772177
// We set active because we are now writing operations to the engine; this way,
21782178
// we can flush if we go idle after some time and become inactive.
21792179
active.set(true);
@@ -2241,7 +2241,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412241
}
22422242

22432243
private void onNewEngine(Engine newEngine) {
2244-
assert Thread.holdsLock(engineMutex);
2244+
assert engineReferenceManager.isExclusiveLockHeldByCurrentThread();
22452245
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
22462246
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
22472247
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
@@ -2252,9 +2252,9 @@ private void onNewEngine(Engine newEngine) {
22522252
*/
22532253
public void performRecoveryRestart() throws IOException {
22542254
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
2255-
synchronized (engineMutex) {
2255+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
22562256
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
2257-
IOUtils.close(currentEngineReference.getAndSet(null));
2257+
IOUtils.close(engineReferenceManager.getAndSet(() -> null));
22582258
resetRecoveryStage();
22592259
}
22602260
}
@@ -2264,7 +2264,7 @@ public void performRecoveryRestart() throws IOException {
22642264
*/
22652265
public void resetRecoveryStage() {
22662266
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
2267-
assert currentEngineReference.get() == null;
2267+
assert assertEngineReferenceIsNull("reset recovery stage, engine should be null");
22682268
if (state != IndexShardState.RECOVERING) {
22692269
throw new IndexShardNotRecoveringException(shardId, state);
22702270
}
@@ -3297,8 +3297,11 @@ Engine getEngine() {
32973297
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
32983298
* closed.
32993299
*/
3300+
@Deprecated
33003301
public Engine getEngineOrNull() {
3301-
return this.currentEngineReference.get();
3302+
try (var engineRef = engineReferenceManager.getEngineRef()) {
3303+
return engineRef.getEngineOrNull();
3304+
}
33023305
}
33033306

33043307
public void startRecovery(
@@ -4312,11 +4315,11 @@ public void resetEngine() {
43124315
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
43134316
assert waitForEngineOrClosedShardListeners.isDone();
43144317
try {
4315-
synchronized (engineMutex) {
4318+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
43164319
verifyNotClosed();
43174320
getEngine().prepareForEngineReset();
43184321
var newEngine = createEngine(newEngineConfig(replicationTracker));
4319-
IOUtils.close(currentEngineReference.getAndSet(newEngine));
4322+
IOUtils.close(engineReferenceManager.getAndSet(() -> newEngine));
43204323
onNewEngine(newEngine);
43214324
}
43224325
onSettingsChanged();
@@ -4342,7 +4345,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424345
SetOnce<Engine> newEngineReference = new SetOnce<>();
43434346
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
43444347
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
4345-
synchronized (engineMutex) {
4348+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
43464349
verifyNotClosed();
43474350
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484351
// acquireXXXCommit and close works.
@@ -4357,39 +4360,41 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574360
) {
43584361
@Override
43594362
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
4360-
synchronized (engineMutex) {
4361-
if (newEngineReference.get() == null) {
4363+
try (var engineRef = engineReferenceManager.getEngineRef()) {
4364+
var engine = engineRef.getEngineOrNull();
4365+
if (engine == null) {
43624366
throw new AlreadyClosedException("engine was closed");
43634367
}
43644368
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
4365-
return newEngineReference.get().acquireLastIndexCommit(false);
4369+
return engine.acquireLastIndexCommit(false);
43664370
}
43674371
}
43684372

43694373
@Override
43704374
public IndexCommitRef acquireSafeIndexCommit() {
4371-
synchronized (engineMutex) {
4372-
if (newEngineReference.get() == null) {
4375+
try (var engineRef = engineReferenceManager.getEngineRef()) {
4376+
var engine = engineRef.getEngineOrNull();
4377+
if (engine == null) {
43734378
throw new AlreadyClosedException("engine was closed");
43744379
}
4375-
return newEngineReference.get().acquireSafeIndexCommit();
4380+
return engine.acquireSafeIndexCommit();
43764381
}
43774382
}
43784383

43794384
@Override
43804385
public void close() throws IOException {
43814386
Engine newEngine;
4382-
synchronized (engineMutex) {
4387+
try (var engineRef = engineReferenceManager.getEngineRef()) {
43834388
newEngine = newEngineReference.get();
4384-
if (newEngine == currentEngineReference.get()) {
4389+
if (newEngine == engineRef.getEngineOrNull()) {
43854390
// we successfully installed the new engine so do not close it.
43864391
newEngine = null;
43874392
}
43884393
}
43894394
IOUtils.close(super::close, newEngine);
43904395
}
43914396
};
4392-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
4397+
IOUtils.close(engineReferenceManager.getAndSet(() -> readOnlyEngine));
43934398
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43944399
onNewEngine(newEngineReference.get());
43954400
}
@@ -4403,9 +4408,9 @@ public void close() throws IOException {
44034408
);
44044409
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
44054410
newEngineReference.get().refresh("reset_engine");
4406-
synchronized (engineMutex) {
4411+
try (var ignored = engineReferenceManager.acquireExclusiveLock()) {
44074412
verifyNotClosed();
4408-
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
4413+
IOUtils.close(engineReferenceManager.getAndSet(() -> newEngineReference.get()));
44094414
// We set active because we are now writing operations to the engine; this way,
44104415
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114416
active.set(true);
@@ -4516,4 +4521,11 @@ public void ensureMutable(ActionListener<Void> listener) {
45164521
l.onResponse(null);
45174522
}));
45184523
}
4524+
4525+
private boolean assertEngineReferenceIsNull(String message) {
4526+
try (var engineRef = engineReferenceManager.getEngineRef()) {
4527+
assert engineRef.getEngineOrNull() == null : message;
4528+
return true;
4529+
}
4530+
}
45194531
}

0 commit comments

Comments
 (0)