Skip to content

Commit de3f416

Browse files
committed
defer engine
1 parent f2da65e commit de3f416

File tree

2 files changed

+63
-4
lines changed

2 files changed

+63
-4
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.index.Index;
5151
import org.elasticsearch.index.IndexNotFoundException;
5252
import org.elasticsearch.index.IndexService;
53+
import org.elasticsearch.index.seqno.SeqNoStats;
5354
import org.elasticsearch.index.seqno.SequenceNumbers;
5455
import org.elasticsearch.index.shard.IndexShard;
5556
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -78,6 +79,7 @@
7879
import java.util.Optional;
7980
import java.util.concurrent.Executor;
8081
import java.util.concurrent.atomic.AtomicBoolean;
82+
import java.util.function.Supplier;
8183

8284
import static org.elasticsearch.core.Strings.format;
8385

@@ -507,7 +509,8 @@ public void handleException(TransportException exp) {
507509

508510
if (syncGlobalCheckpointAfterOperation) {
509511
try {
510-
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
512+
var seqNoStats = primaryShardReference.seqNoStatsSupplier.get();
513+
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation", seqNoStats);
511514
} catch (final Exception e) {
512515
// only log non-closed exceptions
513516
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
@@ -1136,10 +1139,16 @@ class PrimaryShardReference
11361139

11371140
protected final IndexShard indexShard;
11381141
private final Releasable operationLock;
1142+
private final Supplier<SeqNoStats> seqNoStatsSupplier;
1143+
private final Supplier<Long> localCheckpointSupplier;
1144+
private final Supplier<Long> globalCheckpointSupplier;
11391145

11401146
PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
11411147
this.indexShard = indexShard;
11421148
this.operationLock = operationLock;
1149+
this.seqNoStatsSupplier = indexShard.getSeqNoStatsSupplier();
1150+
this.localCheckpointSupplier = indexShard.getLocalCheckpointSupplier();
1151+
this.globalCheckpointSupplier = indexShard.getLastSyncedGlobalCheckpointSupplier();
11431152
}
11441153

11451154
@Override
@@ -1189,12 +1198,12 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
11891198

11901199
@Override
11911200
public long localCheckpoint() {
1192-
return indexShard.getLocalCheckpoint();
1201+
return localCheckpointSupplier.get();
11931202
}
11941203

11951204
@Override
11961205
public long globalCheckpoint() {
1197-
return indexShard.getLastSyncedGlobalCheckpoint();
1206+
return globalCheckpointSupplier.get();
11981207
}
11991208

12001209
@Override

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1351,6 +1351,18 @@ public SeqNoStats seqNoStats() {
13511351
return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
13521352
}
13531353

1354+
/**
1355+
* Returns a supplier that supplies the {@link SeqNoStats} of the engine that is referenced at the time this method is called.
1356+
* Uses this method in place where the current engine reference cannot be resolved directly.
1357+
*
1358+
* @return a supplier of {@link SeqNoStats}
1359+
* @throws AlreadyClosedException if shard is closed
1360+
*/
1361+
public Supplier<SeqNoStats> getSeqNoStatsSupplier() {
1362+
var engine = getEngine();
1363+
return () -> engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
1364+
}
1365+
13541366
public IndexingStats indexingStats() {
13551367
Engine engine = getEngineOrNull();
13561368
final boolean throttled;
@@ -2974,6 +2986,18 @@ public long getLocalCheckpoint() {
29742986
return getEngine().getPersistedLocalCheckpoint();
29752987
}
29762988

2989+
/**
2990+
* Returns a supplier that supplies the local checkpoint of the engine that is referenced at the time this method is called.
2991+
* Uses this method in place where the current engine reference cannot be resolved directly.
2992+
*
2993+
* @return a supplier of the local checkpoint
2994+
* @throws AlreadyClosedException if shard is closed
2995+
*/
2996+
public Supplier<Long> getLocalCheckpointSupplier() {
2997+
var engine = getEngine();
2998+
return () -> engine.getPersistedLocalCheckpoint();
2999+
}
3000+
29773001
/**
29783002
* Returns the global checkpoint for the shard.
29793003
*
@@ -2990,6 +3014,18 @@ public long getLastSyncedGlobalCheckpoint() {
29903014
return getEngine().getLastSyncedGlobalCheckpoint();
29913015
}
29923016

3017+
/**
3018+
* Returns a supplier that supplies the latest global checkpoint of the engine that is referenced at the time this method is called.
3019+
* Uses this method in place where the current engine reference cannot be resolved directly.
3020+
*
3021+
* @return a supplier of the latest global checkpoint value that has been persisted in the underlying storage
3022+
* @throws AlreadyClosedException if shard is closed
3023+
*/
3024+
public Supplier<Long> getLastSyncedGlobalCheckpointSupplier() {
3025+
var engine = getEngine();
3026+
return () -> engine.getLastSyncedGlobalCheckpoint();
3027+
}
3028+
29933029
/**
29943030
* Get the local knowledge of the global checkpoints for all in-sync allocation IDs.
29953031
*
@@ -3012,8 +3048,22 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
30123048
return;
30133049
}
30143050
assert assertPrimaryMode();
3015-
// only sync if there are no operations in flight, or when using async durability
30163051
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
3052+
doMaybeSyncGlobalCheckpoint(reason, stats);
3053+
}
3054+
3055+
public void maybeSyncGlobalCheckpoint(final String reason, final SeqNoStats stats) {
3056+
verifyNotClosed();
3057+
assert shardRouting.primary() : "only call maybeSyncGlobalCheckpoint on primary shard";
3058+
if (replicationTracker.isPrimaryMode() == false) {
3059+
return;
3060+
}
3061+
assert assertPrimaryMode();
3062+
doMaybeSyncGlobalCheckpoint(reason, stats);
3063+
}
3064+
3065+
private void doMaybeSyncGlobalCheckpoint(final String reason, final SeqNoStats stats) {
3066+
// only sync if there are no operations in flight, or when using async durability
30173067
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
30183068
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
30193069
final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync();

0 commit comments

Comments
 (0)