Skip to content

Commit b102065

Browse files
committed
add EngineReadWriteLock
1 parent 6942041 commit b102065

File tree

9 files changed

+111
-17
lines changed

9 files changed

+111
-17
lines changed

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8484
config.getRelativeTimeInNanosSupplier(),
8585
config.getIndexCommitListener(),
8686
config.isPromotableToPrimary(),
87-
config.getMapperService()
87+
config.getMapperService(),
88+
config.getEngineLock()
8889
);
8990
}
9091

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
146146

147147
private final boolean promotableToPrimary;
148148

149+
private final EngineReadWriteLock engineLock;
150+
149151
/**
150152
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
151153
*/
@@ -177,7 +179,8 @@ public EngineConfig(
177179
LongSupplier relativeTimeInNanosSupplier,
178180
Engine.IndexCommitListener indexCommitListener,
179181
boolean promotableToPrimary,
180-
MapperService mapperService
182+
MapperService mapperService,
183+
EngineReadWriteLock engineLock
181184
) {
182185
this.shardId = shardId;
183186
this.indexSettings = indexSettings;
@@ -224,6 +227,7 @@ public EngineConfig(
224227
this.promotableToPrimary = promotableToPrimary;
225228
// always use compound on flush - reduces # of file-handles on refresh
226229
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
230+
this.engineLock = engineLock;
227231
}
228232

229233
/**
@@ -468,4 +472,8 @@ public boolean getUseCompoundFile() {
468472
public MapperService getMapperService() {
469473
return mapperService;
470474
}
475+
476+
public EngineReadWriteLock getEngineLock() {
477+
return engineLock;
478+
}
471479
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import java.util.concurrent.locks.Lock;
13+
import java.util.concurrent.locks.ReadWriteLock;
14+
import java.util.concurrent.locks.ReentrantReadWriteLock;
15+
16+
/**
17+
* Reentrant read/write lock used to guard engine changes in a shard.
18+
*
19+
* Implemented as a simple wrapper around a {@link ReentrantReadWriteLock} to make it easier to add/override methods in the future.
20+
*/
21+
public final class EngineReadWriteLock implements ReadWriteLock {
22+
23+
private final ReentrantReadWriteLock lock;
24+
25+
public EngineReadWriteLock() {
26+
this.lock = new ReentrantReadWriteLock();
27+
}
28+
29+
@Override
30+
public Lock writeLock() {
31+
return lock.writeLock();
32+
}
33+
34+
@Override
35+
public Lock readLock() {
36+
return lock.readLock();
37+
}
38+
39+
/**
40+
* See {@link ReentrantReadWriteLock#isWriteLocked()}
41+
*/
42+
public boolean isWriteLocked() {
43+
return lock.isWriteLocked();
44+
}
45+
46+
/**
47+
* See {@link ReentrantReadWriteLock#isWriteLockedByCurrentThread()}
48+
*/
49+
public boolean isWriteLockedByCurrentThread() {
50+
return lock.isWriteLockedByCurrentThread();
51+
}
52+
53+
/**
54+
* Returns {@code true} if the number of read locks held by any thread is greater than zero.
55+
* This method is designed for use in monitoring system state, not for synchronization control.
56+
*
57+
* @return {@code true} if any thread holds a read lock and {@code false} otherwise
58+
*/
59+
public boolean isReadLocked() {
60+
return lock.getReadLockCount() > 0;
61+
}
62+
63+
/**
64+
* Returns {@code true} if the number of holds on the read lock by the current thread is greater than zero.
65+
* This method is designed for use in monitoring system state, not for synchronization control.
66+
*
67+
* @return {@code true} if the number of holds on the read lock by the current thread is greater than zero, {@code false} otherwise
68+
*/
69+
public boolean isReadLockedByCurrentThread() {
70+
return lock.getReadHoldCount() > 0;
71+
}
72+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.elasticsearch.index.engine.EngineConfig;
9292
import org.elasticsearch.index.engine.EngineException;
9393
import org.elasticsearch.index.engine.EngineFactory;
94+
import org.elasticsearch.index.engine.EngineReadWriteLock;
9495
import org.elasticsearch.index.engine.ReadOnlyEngine;
9596
import org.elasticsearch.index.engine.RefreshFailedEngineException;
9697
import org.elasticsearch.index.engine.SafeCommitInfo;
@@ -182,7 +183,6 @@
182183
import java.util.concurrent.atomic.AtomicInteger;
183184
import java.util.concurrent.atomic.AtomicLong;
184185
import java.util.concurrent.atomic.AtomicReference;
185-
import java.util.concurrent.locks.ReentrantReadWriteLock;
186186
import java.util.function.BiConsumer;
187187
import java.util.function.Consumer;
188188
import java.util.function.Function;
@@ -249,7 +249,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
249249
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
250250

251251
// read/write lock for mutating the engine (lock ordering: closeMutex -> engineLock.writeLock -> mutex)
252-
private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock();
252+
private final EngineReadWriteLock engineLock = new EngineReadWriteLock();
253253
private Engine currentEngine = null; // must be accessed while holding engineLock
254254
final EngineFactory engineFactory;
255255

@@ -1841,7 +1841,7 @@ public void close(String reason, boolean flushEngine, Executor closeExecutor, Ac
18411841
engineLock.writeLock().unlock();
18421842
}
18431843
} finally {
1844-
assert engineLock.getReadHoldCount() > 0 : "hold the read lock when submitting the engine closing task";
1844+
assert engineLock.isReadLockedByCurrentThread() : "hold the read lock when submitting the engine closing task";
18451845
try {
18461846
final Engine engine = engineOrNull;
18471847
// When closeExecutor is EsExecutors.DIRECT_EXECUTOR_SERVICE, the following runnable will run within the current thread
@@ -3397,7 +3397,7 @@ public Engine getEngineOrNull() {
33973397
}
33983398

33993399
private Engine getCurrentEngine(boolean allowNoEngine) {
3400-
assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread();
3400+
assert engineLock.isReadLockedByCurrentThread() || engineLock.isWriteLockedByCurrentThread();
34013401
var engine = this.currentEngine;
34023402
if (engine == null && allowNoEngine == false) {
34033403
throw new AlreadyClosedException("engine is closed");
@@ -3735,7 +3735,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
37353735
relativeTimeInNanosSupplier,
37363736
indexCommitListener,
37373737
routingEntry().isPromotableToPrimary(),
3738-
mapperService()
3738+
mapperService(),
3739+
engineLock
37393740
);
37403741
}
37413742

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3612,7 +3612,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
36123612
config.getRelativeTimeInNanosSupplier(),
36133613
null,
36143614
true,
3615-
config.getMapperService()
3615+
config.getMapperService(),
3616+
config.getEngineLock()
36163617
);
36173618
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
36183619

@@ -7175,7 +7176,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
71757176
config.getRelativeTimeInNanosSupplier(),
71767177
config.getIndexCommitListener(),
71777178
config.isPromotableToPrimary(),
7178-
config.getMapperService()
7179+
config.getMapperService(),
7180+
config.getEngineLock()
71797181
);
71807182
try (InternalEngine engine = createEngine(configWithWarmer)) {
71817183
assertThat(warmedUpReaders, empty());

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5060,7 +5060,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception {
50605060
config.getRelativeTimeInNanosSupplier(),
50615061
config.getIndexCommitListener(),
50625062
config.isPromotableToPrimary(),
5063-
config.getMapperService()
5063+
config.getMapperService(),
5064+
config.getEngineLock()
50645065
);
50655066
return new InternalEngine(configWithWarmer);
50665067
});

server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.index.codec.CodecService;
4040
import org.elasticsearch.index.engine.Engine;
4141
import org.elasticsearch.index.engine.EngineConfig;
42+
import org.elasticsearch.index.engine.EngineReadWriteLock;
4243
import org.elasticsearch.index.engine.EngineTestCase;
4344
import org.elasticsearch.index.engine.InternalEngine;
4445
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
@@ -166,7 +167,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
166167
System::nanoTime,
167168
null,
168169
true,
169-
EngineTestCase.createMapperService()
170+
EngineTestCase.createMapperService(),
171+
new EngineReadWriteLock()
170172
);
171173
engine = new InternalEngine(config);
172174
EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE);

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi
304304
config.getRelativeTimeInNanosSupplier(),
305305
config.getIndexCommitListener(),
306306
config.isPromotableToPrimary(),
307-
config.getMapperService()
307+
config.getMapperService(),
308+
config.getEngineLock()
308309
);
309310
}
310311

@@ -337,7 +338,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
337338
config.getRelativeTimeInNanosSupplier(),
338339
config.getIndexCommitListener(),
339340
config.isPromotableToPrimary(),
340-
config.getMapperService()
341+
config.getMapperService(),
342+
config.getEngineLock()
341343
);
342344
}
343345

@@ -370,7 +372,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
370372
config.getRelativeTimeInNanosSupplier(),
371373
config.getIndexCommitListener(),
372374
config.isPromotableToPrimary(),
373-
config.getMapperService()
375+
config.getMapperService(),
376+
config.getEngineLock()
374377
);
375378
}
376379

@@ -875,7 +878,8 @@ public EngineConfig config(
875878
this::relativeTimeInNanos,
876879
indexCommitListener,
877880
true,
878-
mapperService
881+
mapperService,
882+
new EngineReadWriteLock()
879883
);
880884
}
881885

@@ -916,7 +920,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat
916920
config.getRelativeTimeInNanosSupplier(),
917921
config.getIndexCommitListener(),
918922
config.isPromotableToPrimary(),
919-
config.getMapperService()
923+
config.getMapperService(),
924+
config.getEngineLock()
920925
);
921926
}
922927

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
3333
import org.elasticsearch.index.engine.Engine;
3434
import org.elasticsearch.index.engine.EngineConfig;
35+
import org.elasticsearch.index.engine.EngineReadWriteLock;
3536
import org.elasticsearch.index.engine.EngineTestCase;
3637
import org.elasticsearch.index.engine.InternalEngine;
3738
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
@@ -273,7 +274,8 @@ public void onFailedEngine(String reason, Exception e) {
273274
System::nanoTime,
274275
null,
275276
true,
276-
mapperService
277+
mapperService,
278+
new EngineReadWriteLock()
277279
);
278280
}
279281

0 commit comments

Comments
 (0)