Skip to content

Commit 782187e

Browse files
committed
ES-10826
1 parent 43eee87 commit 782187e

File tree

4 files changed

+137
-39
lines changed

4 files changed

+137
-39
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2347,7 +2347,7 @@ public record FlushResult(boolean flushPerformed, long generation) {
23472347
* in-progress operations and listeners (e.g., primary term and generation listeners).
23482348
* At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset.
23492349
*/
2350-
public void prepareForEngineReset() throws IOException {
2350+
public void beforeReset() throws IOException {
23512351
throw new UnsupportedOperationException("does not support engine reset");
23522352
}
23532353

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2227,8 +2227,10 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
22272227
preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);
22282228
commitIndexWriter(indexWriter, translog);
22292229
logger.trace("finished commit for flush");
2230-
// we need to refresh in order to clear older version values
2231-
refresh("version_table_flush", SearcherScope.INTERNAL, true);
2230+
if (isClosing() == false) {
2231+
// we need to refresh in order to clear older version values
2232+
refresh("version_table_flush", SearcherScope.INTERNAL, true);
2233+
}
22322234
translog.trimUnreferencedReaders();
22332235
// Update the translog location for flushListener if (1) the writeLocation has changed during the flush and
22342236
// (2) indexWriter has committed all the changes (checks must be done in this order).

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 131 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
4646
import org.elasticsearch.cluster.routing.ShardRouting;
4747
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
48+
import org.elasticsearch.cluster.service.ClusterApplierService;
49+
import org.elasticsearch.cluster.service.MasterService;
4850
import org.elasticsearch.common.UUIDs;
4951
import org.elasticsearch.common.io.stream.BytesStreamOutput;
5052
import org.elasticsearch.common.lucene.Lucene;
@@ -152,6 +154,7 @@
152154
import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader;
153155
import org.elasticsearch.search.suggest.completion.CompletionStats;
154156
import org.elasticsearch.threadpool.ThreadPool;
157+
import org.elasticsearch.transport.Transports;
155158

156159
import java.io.Closeable;
157160
import java.io.IOException;
@@ -178,6 +181,7 @@
178181
import java.util.concurrent.atomic.AtomicInteger;
179182
import java.util.concurrent.atomic.AtomicLong;
180183
import java.util.concurrent.atomic.AtomicReference;
184+
import java.util.concurrent.locks.ReentrantReadWriteLock;
181185
import java.util.function.BiConsumer;
182186
import java.util.function.Consumer;
183187
import java.util.function.Function;
@@ -240,8 +244,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240244
// ensure happens-before relation between addRefreshListener() and postRecovery()
241245
private volatile SubscribableListener<Void> postRecoveryComplete;
242246
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
243-
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
244-
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
247+
248+
private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); // lock ordering: engineLock.writeLock -> mutex
249+
private Engine currentEngine = null; // must be accessed while holding engineLock
245250
final EngineFactory engineFactory;
246251

247252
private final IndexingOperationListener indexingOperationListeners;
@@ -1613,7 +1618,8 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131618
Engine.IndexCommitRef indexCommit = null;
16141619
store.incRef();
16151620
try {
1616-
synchronized (engineMutex) {
1621+
engineLock.readLock().lock();
1622+
try {
16171623
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
16181624
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
16191625
final Engine engine = getEngineOrNull();
@@ -1623,6 +1629,8 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16231629
if (indexCommit == null) {
16241630
return store.getMetadata(null, true);
16251631
}
1632+
} finally {
1633+
engineLock.readLock().unlock();
16261634
}
16271635
return store.getMetadata(indexCommit.getIndexCommit());
16281636
} finally {
@@ -1776,14 +1784,15 @@ public CacheHelper getReaderCacheHelper() {
17761784
}
17771785

17781786
public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
1779-
synchronized (engineMutex) {
1787+
engineLock.writeLock().lock();
1788+
try {
17801789
try {
17811790
synchronized (mutex) {
17821791
changeState(IndexShardState.CLOSED, reason);
17831792
}
17841793
checkAndCallWaitForEngineOrClosedShardListeners();
17851794
} finally {
1786-
final Engine engine = this.currentEngineReference.getAndSet(null);
1795+
final Engine engine = getAndSetCurrentEngine(null);
17871796
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
17881797
@Override
17891798
public void run() throws Exception {
@@ -1810,6 +1819,8 @@ public String toString() {
18101819
}
18111820
}));
18121821
}
1822+
} finally {
1823+
engineLock.writeLock().unlock();
18131824
}
18141825
}
18151826

@@ -1857,7 +1868,7 @@ public void prepareForIndexRecovery() {
18571868
throw new IndexShardNotRecoveringException(shardId, state);
18581869
}
18591870
recoveryState.setStage(RecoveryState.Stage.INDEX);
1860-
assert currentEngineReference.get() == null;
1871+
assert this.currentEngine == null;
18611872
}
18621873

18631874
/**
@@ -1936,8 +1947,11 @@ private void doLocalRecovery(
19361947
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371948
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
19381949
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
1939-
synchronized (engineMutex) {
1940-
IOUtils.close(currentEngineReference.getAndSet(null));
1950+
engineLock.writeLock().lock();
1951+
try {
1952+
IOUtils.close(getAndSetCurrentEngine(null));
1953+
} finally {
1954+
engineLock.writeLock().unlock();
19411955
}
19421956
}, (recoveryCompleteListener, ignoredRef) -> {
19431957
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
@@ -2167,16 +2181,19 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672181
+ recoveryState.getRecoverySource()
21682182
+ "] but got "
21692183
+ getRetentionLeases();
2170-
synchronized (engineMutex) {
2171-
assert currentEngineReference.get() == null : "engine is running";
2184+
engineLock.writeLock().lock();
2185+
try {
2186+
assert currentEngine == null : "engine is running";
21722187
verifyNotClosed();
21732188
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
21742189
final Engine newEngine = createEngine(config);
21752190
onNewEngine(newEngine);
2176-
currentEngineReference.set(newEngine);
2191+
getAndSetCurrentEngine(newEngine);
21772192
// We set active because we are now writing operations to the engine; this way,
21782193
// we can flush if we go idle after some time and become inactive.
21792194
active.set(true);
2195+
} finally {
2196+
engineLock.writeLock().unlock();
21802197
}
21812198
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
21822199
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@@ -2241,7 +2258,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412258
}
22422259

22432260
private void onNewEngine(Engine newEngine) {
2244-
assert Thread.holdsLock(engineMutex);
2261+
assert engineLock.isWriteLockedByCurrentThread();
22452262
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
22462263
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
22472264
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
@@ -2252,10 +2269,13 @@ private void onNewEngine(Engine newEngine) {
22522269
*/
22532270
public void performRecoveryRestart() throws IOException {
22542271
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
2255-
synchronized (engineMutex) {
2272+
engineLock.writeLock().lock();
2273+
try {
22562274
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
2257-
IOUtils.close(currentEngineReference.getAndSet(null));
2275+
IOUtils.close(getAndSetCurrentEngine(null));
22582276
resetRecoveryStage();
2277+
} finally {
2278+
engineLock.writeLock().unlock();
22592279
}
22602280
}
22612281

@@ -2264,7 +2284,7 @@ public void performRecoveryRestart() throws IOException {
22642284
*/
22652285
public void resetRecoveryStage() {
22662286
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
2267-
assert currentEngineReference.get() == null;
2287+
assert this.currentEngine == null;
22682288
if (state != IndexShardState.RECOVERING) {
22692289
throw new IndexShardNotRecoveringException(shardId, state);
22702290
}
@@ -2593,9 +2613,20 @@ boolean shouldRollTranslogGeneration() {
25932613
}
25942614

25952615
public void onSettingsChanged() {
2596-
Engine engineOrNull = getEngineOrNull();
2597-
if (engineOrNull != null) {
2598-
engineOrNull.onSettingsChanged();
2616+
// This method can be called within the cluster state applier thread
2617+
if (engineLock.readLock().tryLock() == false) {
2618+
// Attempt to acquire a read lock failed:
2619+
// - the engine is closing, in which case we don't need to apply the updated index settings
2620+
// - otherwise the onSettingsChanged() should be called again after the new engine is created and the write lock is released
2621+
return;
2622+
}
2623+
try {
2624+
var engineOrNull = getCurrentEngine(true);
2625+
if (engineOrNull != null) {
2626+
engineOrNull.onSettingsChanged();
2627+
}
2628+
} finally {
2629+
engineLock.readLock().unlock();
25992630
}
26002631
}
26012632

@@ -3286,19 +3317,66 @@ private void doCheckIndex() throws IOException {
32863317
}
32873318

32883319
Engine getEngine() {
3289-
Engine engine = getEngineOrNull();
3290-
if (engine == null) {
3291-
throw new AlreadyClosedException("engine is closed");
3320+
engineLock.readLock().lock();
3321+
try {
3322+
return getCurrentEngine(false);
3323+
} finally {
3324+
engineLock.readLock().unlock();
32923325
}
3293-
return engine;
32943326
}
32953327

32963328
/**
32973329
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
32983330
* closed.
32993331
*/
33003332
public Engine getEngineOrNull() {
3301-
return this.currentEngineReference.get();
3333+
engineLock.readLock().lock();
3334+
try {
3335+
return getCurrentEngine(true);
3336+
} finally {
3337+
engineLock.readLock().unlock();
3338+
}
3339+
}
3340+
3341+
private Engine getCurrentEngine(boolean allowNoEngine) {
3342+
assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread();
3343+
var engine = this.currentEngine;
3344+
if (engine == null && allowNoEngine == false) {
3345+
throw new AlreadyClosedException("engine is closed");
3346+
}
3347+
return engine;
3348+
}
3349+
3350+
private Engine getAndSetCurrentEngine(Engine newEngine) {
3351+
assert engineLock.isWriteLockedByCurrentThread();
3352+
var previousEngine = this.currentEngine;
3353+
this.currentEngine = newEngine;
3354+
return previousEngine;
3355+
}
3356+
3357+
/**
3358+
* Executes an operation while preventing the shard's engine instance to be changed or closed during the execution. The parameter
3359+
* {@code allowNoEngine} is used to allow the operation to be executed with a null engine instance. When {@code allowNoEngine} is set
3360+
* to {@code `false`} the method will throw an {@link AlreadyClosedException} if the current engine is null.
3361+
*
3362+
* @param operation
3363+
* @param allowNoEngine
3364+
* @return
3365+
* @param <R>
3366+
*/
3367+
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
3368+
assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block");
3369+
assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block");
3370+
assert Transports.assertNotTransportThread("IndexShard.withEngine() can block");
3371+
assert operation != null;
3372+
3373+
engineLock.readLock().lock();
3374+
try {
3375+
var engine = getCurrentEngine(allowNoEngine);
3376+
return operation.apply(engine);
3377+
} finally {
3378+
engineLock.readLock().unlock();
3379+
}
33023380
}
33033381

33043382
public void startRecovery(
@@ -4302,7 +4380,7 @@ public void afterRefresh(boolean didRefresh) {
43024380
/**
43034381
* Reset the current engine to a new one.
43044382
*
4305-
* Calls {@link Engine#prepareForEngineReset()} on the current engine, then closes it, and loads a new engine without
4383+
* Calls {@link Engine#beforeReset()} on the current engine, then closes it, and loads a new engine without
43064384
* doing any translog recovery.
43074385
*
43084386
* In general, resetting the engine should be done with care, to consider any in-progress operations and listeners.
@@ -4312,12 +4390,15 @@ public void resetEngine() {
43124390
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
43134391
assert waitForEngineOrClosedShardListeners.isDone();
43144392
try {
4315-
synchronized (engineMutex) {
4393+
engineLock.writeLock().lock(); // might already be held
4394+
try {
43164395
verifyNotClosed();
4317-
getEngine().prepareForEngineReset();
4396+
currentEngine.beforeReset();
43184397
var newEngine = createEngine(newEngineConfig(replicationTracker));
4319-
IOUtils.close(currentEngineReference.getAndSet(newEngine));
4398+
IOUtils.close(getAndSetCurrentEngine(newEngine));
43204399
onNewEngine(newEngine);
4400+
} finally {
4401+
engineLock.writeLock().unlock();
43214402
}
43224403
onSettingsChanged();
43234404
} catch (Exception e) {
@@ -4342,7 +4423,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424423
SetOnce<Engine> newEngineReference = new SetOnce<>();
43434424
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
43444425
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
4345-
synchronized (engineMutex) {
4426+
engineLock.writeLock().lock();
4427+
try {
43464428
verifyNotClosed();
43474429
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484430
// acquireXXXCommit and close works.
@@ -4357,41 +4439,52 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574439
) {
43584440
@Override
43594441
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
4360-
synchronized (engineMutex) {
4442+
engineLock.readLock().lock();
4443+
try {
43614444
if (newEngineReference.get() == null) {
43624445
throw new AlreadyClosedException("engine was closed");
43634446
}
43644447
// ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
43654448
return newEngineReference.get().acquireLastIndexCommit(false);
4449+
} finally {
4450+
engineLock.readLock().unlock();
43664451
}
43674452
}
43684453

43694454
@Override
43704455
public IndexCommitRef acquireSafeIndexCommit() {
4371-
synchronized (engineMutex) {
4456+
engineLock.readLock().lock();
4457+
try {
43724458
if (newEngineReference.get() == null) {
43734459
throw new AlreadyClosedException("engine was closed");
43744460
}
43754461
return newEngineReference.get().acquireSafeIndexCommit();
4462+
} finally {
4463+
engineLock.readLock().unlock();
43764464
}
43774465
}
43784466

43794467
@Override
43804468
public void close() throws IOException {
43814469
Engine newEngine;
4382-
synchronized (engineMutex) {
4470+
engineLock.readLock().lock();
4471+
try {
43834472
newEngine = newEngineReference.get();
4384-
if (newEngine == currentEngineReference.get()) {
4473+
if (newEngine == getCurrentEngine(true)) {
43854474
// we successfully installed the new engine so do not close it.
43864475
newEngine = null;
43874476
}
4477+
} finally {
4478+
engineLock.readLock().unlock();
43884479
}
43894480
IOUtils.close(super::close, newEngine);
43904481
}
43914482
};
4392-
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
4483+
IOUtils.close(getAndSetCurrentEngine(readOnlyEngine));
43934484
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
43944485
onNewEngine(newEngineReference.get());
4486+
} finally {
4487+
engineLock.writeLock().unlock();
43954488
}
43964489
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
43974490
engine,
@@ -4403,12 +4496,15 @@ public void close() throws IOException {
44034496
);
44044497
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
44054498
newEngineReference.get().refresh("reset_engine");
4406-
synchronized (engineMutex) {
4499+
engineLock.writeLock().lock();
4500+
try {
44074501
verifyNotClosed();
4408-
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
4502+
IOUtils.close(getAndSetCurrentEngine(newEngineReference.get()));
44094503
// We set active because we are now writing operations to the engine; this way,
44104504
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114505
active.set(true);
4506+
} finally {
4507+
engineLock.writeLock().unlock();
44124508
}
44134509
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
44144510
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.

0 commit comments

Comments
 (0)