Skip to content

Commit 83ba84f

Browse files
committed
Assert engine not got by trasnport worker threads
Relates ES-12215
1 parent 650ce32 commit 83ba84f

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3389,6 +3389,8 @@ public Engine getEngineOrNull() {
33893389
}
33903390

33913391
private Engine getCurrentEngine(boolean allowNoEngine) {
3392+
assert engineResetLock.isWriteLockedByCurrentThread()
3393+
|| assertCurrentThreadWithEngine("method IndexShard#getCurrentEngine (or one of its variant) can block");
33923394
assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */;
33933395
var engine = currentEngine.get();
33943396
if (engine == null && allowNoEngine == false) {
@@ -3441,7 +3443,7 @@ public <R> R withEngine(Function<Engine, R> operation) {
34413443
* @throws AlreadyClosedException if the current engine instance is {@code null}.
34423444
*/
34433445
public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R, E> operation) throws E {
3444-
assert assertCurrentThreadWithEngine();
3446+
assert assertCurrentThreadWithEngine("method IndexShard#withEngineException (or one of its variant) can block");
34453447
assert operation != null;
34463448

34473449
engineResetLock.readLock().lock();
@@ -3467,7 +3469,7 @@ public <R, E extends Exception> R withEngineException(CheckedFunction<Engine, R,
34673469
* @param <R> the type of the result
34683470
*/
34693471
private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
3470-
assert assertCurrentThreadWithEngine();
3472+
assert assertCurrentThreadWithEngine("method IndexShard#withEngine (or one of its variant) can block");
34713473
assert operation != null;
34723474

34733475
engineResetLock.readLock().lock();
@@ -3479,8 +3481,7 @@ private <R> R withEngine(Function<Engine, R> operation, boolean allowNoEngine) {
34793481
}
34803482
}
34813483

3482-
private static boolean assertCurrentThreadWithEngine() {
3483-
var message = "method IndexShard#withEngine (or one of its variant) can block";
3484+
private static boolean assertCurrentThreadWithEngine(String message) {
34843485
assert ClusterApplierService.assertNotClusterStateUpdateThread(message);
34853486
assert MasterService.assertNotMasterUpdateThread(message);
34863487
assert Transports.assertNotTransportThread(message);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.ccr.action;
88

9+
import org.apache.lucene.util.SetOnce;
910
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.ActionRequestValidationException;
@@ -403,7 +404,15 @@ protected void asyncShardOperation(final Request request, final ShardId shardId,
403404
throws IOException {
404405
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
405406
final IndexShard indexShard = indexService.getShard(request.getShard().id());
406-
final SeqNoStats seqNoStats = indexShard.seqNoStats();
407+
final SetOnce<SeqNoStats> seqNoStats = new SetOnce<>();
408+
threadPool.generic().execute(() -> { seqNoStats.set(indexShard.seqNoStats()); });
409+
while (seqNoStats.get() == null) {
410+
try {
411+
Thread.sleep(10); // wait for seqNoStats to be set
412+
} catch (InterruptedException e) {
413+
;
414+
}
415+
}
407416

408417
if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
409418
logger.trace(

0 commit comments

Comments
 (0)