Skip to content

Commit 0cf80f4

Browse files
authored
Move global checkpoint sync to write threadpool (#96364)
This commit moves the global checkpoint sync action to the write thread pool. Additionally, it moves the sync pathway to the same pathway as the location sync so that location syncs and global checkpoint syncs will worksteal against each other instead of generating independent syncs.
1 parent dae0c6f commit 0cf80f4

File tree

12 files changed

+211
-90
lines changed

12 files changed

+211
-90
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
3737
.put("thread_pool.search.size", 1)
3838
.put("thread_pool.search.queue_size", 1)
3939
.put("thread_pool.write.size", 1)
40-
.put("thread_pool.write.queue_size", 1)
40+
// Needs to be 2 since we have concurrent indexing and global checkpoint syncs
41+
.put("thread_pool.write.queue_size", 2)
4142
.put("thread_pool.get.size", 1)
4243
.put("thread_pool.get.queue_size", 1)
4344
.build();

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ public void testDurableFlagHasEffect() {
160160
Translog.Location lastWriteLocation = tlog.getLastWriteLocation();
161161
try {
162162
// the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one
163-
return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0));
163+
return tlog.ensureSynced(
164+
new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0),
165+
SequenceNumbers.UNASSIGNED_SEQ_NO
166+
);
164167
} catch (IOException e) {
165168
throw new UncheckedIOException(e);
166169
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,11 @@ public enum SearcherScope {
758758
*/
759759
public abstract void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener);
760760

761+
/**
762+
* Ensures that the global checkpoint has been persisted to the underlying storage.
763+
*/
764+
public abstract void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener);
765+
761766
public abstract void syncTranslog() throws IOException;
762767

763768
/**

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public class InternalEngine extends Engine {
179179
private final SoftDeletesPolicy softDeletesPolicy;
180180
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
181181
private final FlushListeners flushListener;
182-
private final AsyncIOProcessor<Translog.Location> translogSyncProcessor;
182+
private final AsyncIOProcessor<Tuple<Long, Translog.Location>> translogSyncProcessor;
183183

184184
private final CompletionStatsCache completionStatsCache;
185185

@@ -617,12 +617,23 @@ public boolean isTranslogSyncNeeded() {
617617
return getTranslog().syncNeeded();
618618
}
619619

620-
private AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
620+
private AsyncIOProcessor<Tuple<Long, Translog.Location>> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
621621
return new AsyncIOProcessor<>(logger, 1024, threadContext) {
622622
@Override
623-
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
623+
protected void write(List<Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>>> candidates) throws IOException {
624624
try {
625-
final boolean synced = translog.ensureSynced(candidates.stream().map(Tuple::v1));
625+
Translog.Location location = Translog.Location.EMPTY;
626+
long processGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
627+
for (Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>> syncMarkers : candidates) {
628+
Tuple<Long, Translog.Location> marker = syncMarkers.v1();
629+
long globalCheckpointToSync = marker.v1();
630+
if (globalCheckpointToSync != SequenceNumbers.UNASSIGNED_SEQ_NO) {
631+
processGlobalCheckpoint = SequenceNumbers.max(processGlobalCheckpoint, globalCheckpointToSync);
632+
}
633+
location = location.compareTo(marker.v2()) >= 0 ? location : marker.v2();
634+
}
635+
636+
final boolean synced = translog.ensureSynced(location, processGlobalCheckpoint);
626637
if (synced) {
627638
revisitIndexDeletionPolicyOnTranslogSynced();
628639
}
@@ -639,7 +650,12 @@ protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candida
639650

640651
@Override
641652
public void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener) {
642-
translogSyncProcessor.put(location, listener);
653+
translogSyncProcessor.put(new Tuple<>(SequenceNumbers.NO_OPS_PERFORMED, location), listener);
654+
}
655+
656+
@Override
657+
public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
658+
translogSyncProcessor.put(new Tuple<>(globalCheckpoint, Translog.Location.EMPTY), listener);
643659
}
644660

645661
@Override

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,11 @@ public void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Excep
338338
listener.accept(null);
339339
}
340340

341+
@Override
342+
public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
343+
listener.accept(null);
344+
}
345+
341346
@Override
342347
public void syncTranslog() {}
343348

server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ public GlobalCheckpointSyncAction(
6262
actionFilters,
6363
Request::new,
6464
Request::new,
65-
ThreadPool.Names.MANAGEMENT
65+
ThreadPool.Names.WRITE,
66+
false,
67+
true
6668
);
6769
}
6870

@@ -77,24 +79,26 @@ protected void shardOperationOnPrimary(
7779
IndexShard indexShard,
7880
ActionListener<PrimaryResult<Request, ReplicationResponse>> listener
7981
) {
80-
ActionListener.completeWith(listener, () -> {
81-
maybeSyncTranslog(indexShard);
82-
return new PrimaryResult<>(request, new ReplicationResponse());
83-
});
82+
maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse())));
8483
}
8584

8685
@Override
8786
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
88-
ActionListener.completeWith(listener, () -> {
89-
maybeSyncTranslog(replica);
90-
return new ReplicaResult();
91-
});
87+
maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult()));
9288
}
9389

94-
private static void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
90+
private static <T> void maybeSyncTranslog(IndexShard indexShard, ActionListener<Void> listener) {
9591
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST
9692
&& indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) {
97-
indexShard.sync();
93+
indexShard.syncGlobalCheckpoint(indexShard.getLastKnownGlobalCheckpoint(), e -> {
94+
if (e == null) {
95+
listener.onResponse(null);
96+
} else {
97+
listener.onFailure(e);
98+
}
99+
});
100+
} else {
101+
listener.onResponse(null);
98102
}
99103
}
100104

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3615,6 +3615,17 @@ public final void sync(Translog.Location location, Consumer<Exception> syncListe
36153615
getEngine().asyncEnsureTranslogSynced(location, syncListener);
36163616
}
36173617

3618+
/**
3619+
* This method provides the same behavior as #sync but for persisting the global checkpoint. It will initiate a sync
3620+
* if the request global checkpoint is greater than the currently persisted global checkpoint. However, same as #sync it
3621+
* will not ensure that the request global checkpoint is available to be synced. It is the caller's duty to only call this
3622+
* method with a valid processed global checkpoint that is available to sync.
3623+
*/
3624+
public void syncGlobalCheckpoint(long globalCheckpoint, Consumer<Exception> syncListener) {
3625+
verifyNotClosed();
3626+
getEngine().asyncEnsureGlobalCheckpointSynced(globalCheckpoint, syncListener);
3627+
}
3628+
36183629
public void sync() throws IOException {
36193630
verifyNotClosed();
36203631
getEngine().syncTranslog();

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.Iterator;
5252
import java.util.List;
5353
import java.util.Objects;
54-
import java.util.Optional;
5554
import java.util.OptionalLong;
5655
import java.util.concurrent.atomic.AtomicBoolean;
5756
import java.util.concurrent.locks.ReadWriteLock;
@@ -839,15 +838,18 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException {
839838
}
840839

841840
/**
842-
* Ensures that the given location has be synced / written to the underlying storage.
841+
* Ensures that the given location and global checkpoint has be synced / written to the underlying storage.
843842
*
844843
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
845844
*/
846-
public boolean ensureSynced(Location location) throws IOException {
845+
public boolean ensureSynced(Location location, long globalCheckpoint) throws IOException {
847846
try (ReleasableLock lock = readLock.acquire()) {
848-
if (location.generation == current.getGeneration()) { // if we have a new one it's already synced
847+
// if we have a new generation and the persisted global checkpoint is greater than or equal to the sync global checkpoint it's
848+
// already synced
849+
long persistedGlobalCheckpoint = current.getLastSyncedCheckpoint().globalCheckpoint;
850+
if (location.generation == current.getGeneration() || persistedGlobalCheckpoint < globalCheckpoint) {
849851
ensureOpen();
850-
return current.syncUpTo(location.translogLocation + location.size);
852+
return current.syncUpTo(location.translogLocation + location.size, globalCheckpoint);
851853
}
852854
} catch (final Exception ex) {
853855
closeOnTragicEvent(ex);
@@ -856,24 +858,6 @@ public boolean ensureSynced(Location location) throws IOException {
856858
return false;
857859
}
858860

859-
/**
860-
* Ensures that all locations in the given stream have been synced / written to the underlying storage.
861-
* This method allows for internal optimization to minimize the amount of fsync operations if multiple
862-
* locations must be synced.
863-
*
864-
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
865-
*/
866-
public boolean ensureSynced(Stream<Location> locations) throws IOException {
867-
final Optional<Location> max = locations.max(Location::compareTo);
868-
// we only need to sync the max location since it will sync all other
869-
// locations implicitly
870-
if (max.isPresent()) {
871-
return ensureSynced(max.get());
872-
} else {
873-
return false;
874-
}
875-
}
876-
877861
/**
878862
* Closes the translog if the current translog writer experienced a tragic exception.
879863
*
@@ -929,6 +913,8 @@ public TranslogDeletionPolicy getDeletionPolicy() {
929913

930914
public static class Location implements Comparable<Location> {
931915

916+
public static Location EMPTY = new Location(0, 0, 0);
917+
932918
public final long generation;
933919
public final long translogLocation;
934920
public final int size;

server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
346346
* raising the exception.
347347
*/
348348
public void sync() throws IOException {
349-
syncUpTo(Long.MAX_VALUE);
349+
syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO);
350350
}
351351

352352
/**
@@ -462,10 +462,17 @@ private long getWrittenOffset() throws IOException {
462462
*
463463
* @return <code>true</code> if this call caused an actual sync operation
464464
*/
465-
final boolean syncUpTo(long offset) throws IOException {
466-
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
465+
final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException {
466+
if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) {
467+
assert globalCheckpointToPersist <= globalCheckpointSupplier.getAsLong()
468+
: "globalCheckpointToPersist ["
469+
+ globalCheckpointToPersist
470+
+ "] greater than global checkpoint ["
471+
+ globalCheckpointSupplier.getAsLong()
472+
+ "]";
467473
synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
468-
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
474+
if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist)
475+
&& syncNeeded()) {
469476
// double checked locking - we don't want to fsync unless we have to and now that we have
470477
// the lock we should check again since if this code is busy we might have fsynced enough already
471478
final Checkpoint checkpointToSync;

server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@
2727
import org.elasticsearch.transport.TransportService;
2828

2929
import java.util.Collections;
30+
import java.util.function.Consumer;
3031

3132
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
33+
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.ArgumentMatchers.anyLong;
35+
import static org.mockito.ArgumentMatchers.eq;
36+
import static org.mockito.Mockito.doAnswer;
3237
import static org.mockito.Mockito.mock;
3338
import static org.mockito.Mockito.never;
3439
import static org.mockito.Mockito.verify;
@@ -100,6 +105,11 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
100105

101106
when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint);
102107
when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint);
108+
doAnswer(invocation -> {
109+
Consumer<Exception> argument = invocation.getArgument(1);
110+
argument.accept(null);
111+
return null;
112+
}).when(indexShard).syncGlobalCheckpoint(anyLong(), any());
103113

104114
final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
105115
Settings.EMPTY,
@@ -123,9 +133,10 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
123133

124134
if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
125135
verify(indexShard, never()).sync();
136+
verify(indexShard, never()).syncGlobalCheckpoint(anyLong(), any());
126137
} else {
127-
verify(indexShard).sync();
138+
verify(indexShard, never()).sync();
139+
verify(indexShard).syncGlobalCheckpoint(eq(globalCheckpoint), any());
128140
}
129141
}
130-
131142
}

0 commit comments

Comments
 (0)