Skip to content

Commit 4e5225f

Browse files
committed
Optimize heartbeat clear. Deprecate use-sync-indexing API
1 parent 3790298 commit 4e5225f

File tree

10 files changed

+34
-131
lines changed

10 files changed

+34
-131
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ protected CompletableFuture<FDBStoredRecord<Message>> recordIfInIndexedTypes(FDB
150150

151151
// buildIndexAsync - the main indexing function. Builds and commits indexes asynchronously; throttling to avoid overloading the system.
152152
@SuppressWarnings("PMD.CloseResource")
153-
public CompletableFuture<Void> buildIndexAsync(boolean markReadable, boolean useSyncLock) {
153+
public CompletableFuture<Void> buildIndexAsync(boolean markReadable) {
154154
KeyValueLogMessage message = KeyValueLogMessage.build("build index online",
155155
LogMessageKeys.SHOULD_MARK_READABLE, markReadable);
156156
long startNanos = System.nanoTime();
@@ -179,6 +179,8 @@ public CompletableFuture<Void> buildIndexAsync(boolean markReadable, boolean use
179179
}
180180
return ret;
181181
})
182+
// Here: if the heartbeat was *not* cleared while marking the index readable, it would be cleared in
183+
// these dedicated transaction. Heartbeat clearing is not a blocker but a "best effort" operation.
182184
.thenCompose(ignore -> clearHeartbeats())
183185
.handle((ignore, exIgnore) -> {
184186
Throwable ex = indexingException.get();
@@ -322,6 +324,7 @@ public CompletableFuture<Boolean> markIndexReadable(boolean markReadablePlease)
322324
if (ex != null) {
323325
throw ex;
324326
}
327+
heartbeat = null; // Here: heartbeats had been successfully cleared. No need to clear again
325328
return anythingChanged.get();
326329
});
327330
}
@@ -332,12 +335,14 @@ private CompletableFuture<Boolean> markIndexReadableSingleTarget(Index index, At
332335
return getRunner().runAsync(context ->
333336
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
334337
.thenCompose(store -> {
338+
clearHeartbeatSingleTarget(store, index);
335339
return policy.shouldAllowUniquePendingState(store) ?
336340
store.markIndexReadableOrUniquePending(index) :
337341
store.markIndexReadable(index);
338342
})
339343
).handle((changed, ex) -> {
340344
if (ex == null) {
345+
heartbeat = null; // Here: all heartbeats were successfully cleared withing the set readable transactions. No need to clear again.
341346
if (Boolean.TRUE.equals(changed)) {
342347
anythingChanged.set(true);
343348
}
@@ -358,7 +363,7 @@ public void enforceStampOverwrite() {
358363
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) {
359364
// continuedBuild is set if this session isn't a continuation of a previous indexing
360365
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store);
361-
heartbeat = new IndexingHeartbeat(common.getUuid(), indexingTypeStamp.getMethod());
366+
heartbeat = new IndexingHeartbeat(common.getUuid(), indexingTypeStamp.getMethod(), common.config.getLeaseLengthMillis());
362367

363368
return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp));
364369
}
@@ -850,23 +855,28 @@ private CompletableFuture<Void> updateHeartbeat(boolean validate, FDBRecordStore
850855
}
851856

852857
private CompletableFuture<Void> clearHeartbeats() {
853-
// Here: if the heartbeat was *not* cleared while marking the index readable, it would be cleared in
854-
// these dedicated transaction. Heartbeat clearing is not a blocker but a "best effort" operation.
855858
if (heartbeat == null) {
856859
return AsyncUtil.DONE;
857860
}
858-
return forEachTargetIndex(this::clearHeartbeatSingleTarget);
861+
return forEachTargetIndex(this::clearHeartbeatSingleTarget)
862+
.thenAccept(ignore -> heartbeat = null);
859863
}
860864

861865
private CompletableFuture<Void> clearHeartbeatSingleTarget(Index index) {
862866
return getRunner().runAsync(context ->
863867
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
864868
.thenApply(store -> {
865-
heartbeat.clearHeartbeat(store, index);
869+
clearHeartbeatSingleTarget(store, index);
866870
return null;
867871
}));
868872
}
869873

874+
private void clearHeartbeatSingleTarget(FDBRecordStore store, Index index) {
875+
if (heartbeat != null) {
876+
heartbeat.clearHeartbeat(store, index);
877+
}
878+
}
879+
870880

871881
private boolean shouldValidate() {
872882
final long minimalInterval = policy.getCheckIndexingMethodFrequencyMilliseconds();

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,18 @@
3333
import javax.annotation.Nonnull;
3434
import java.util.UUID;
3535
import java.util.concurrent.CompletableFuture;
36-
import java.util.concurrent.TimeUnit;
3736

3837
public class IndexingHeartbeat {
3938
// [prefix, xid] -> [indexing-type, genesis time, heartbeat time]
4039
final UUID sessionId;
4140
final IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod;
4241
final long genesisTimeMilliseconds;
42+
final long leaseLength;
4343

44-
public IndexingHeartbeat(final UUID sessionId, IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod) {
44+
public IndexingHeartbeat(final UUID sessionId, IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod, long leaseLength) {
4545
this.sessionId = sessionId;
4646
this.indexingMethod = indexingMethod;
47+
this.leaseLength = leaseLength;
4748
this.genesisTimeMilliseconds = nowMilliseconds();
4849
}
4950

@@ -94,7 +95,7 @@ private void validateNonCompetingHeartbeat(KeyValue kv) {
9495
try {
9596
final IndexBuildProto.IndexingHeartbeat otherHeartbeat = IndexBuildProto.IndexingHeartbeat.parseFrom(kv.getValue());
9697
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
97-
if (age > 0 && age < TimeUnit.SECONDS.toMillis(10)) {
98+
if (age > 0 && age < leaseLength) {
9899
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress");
99100
// TODO: log details
100101
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationBaseBuilder.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.apple.foundationdb.record.provider.foundationdb;
2222

23+
import com.apple.foundationdb.annotation.API;
2324
import com.apple.foundationdb.record.RecordCoreException;
2425
import com.apple.foundationdb.record.RecordMetaData;
2526
import com.apple.foundationdb.record.RecordMetaDataProvider;
@@ -715,15 +716,17 @@ public B setTransactionTimeLimitMilliseconds(long timeLimitMilliseconds) {
715716
* @see SynchronizedSessionRunner
716717
* @param useSynchronizedSession use synchronize session if true, otherwise false
717718
* @return this builder
719+
*
720+
* @deprecated Synchronized sessions are now determined by the indexing method.
718721
*/
722+
@API(API.Status.DEPRECATED)
723+
@Deprecated(since = "4.4.3.0", forRemoval = true)
719724
public B setUseSynchronizedSession(boolean useSynchronizedSession) {
720-
configBuilder.setUseSynchronizedSession(useSynchronizedSession);
721725
return self();
722726
}
723727

724728
/**
725729
* Set the lease length in milliseconds if the synchronized session is used. The default value is {@link OnlineIndexOperationConfig#DEFAULT_LEASE_LENGTH_MILLIS}.
726-
* @see #setUseSynchronizedSession(boolean)
727730
* @see com.apple.foundationdb.synchronizedsession.SynchronizedSession
728731
* @param leaseLengthMillis length between last access and lease's end time in milliseconds
729732
* @return this builder

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationConfig.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,13 @@ public class OnlineIndexOperationConfig {
7777
private final int increaseLimitAfter;
7878
private final long timeLimitMilliseconds;
7979
private final long transactionTimeLimitMilliseconds;
80-
private final boolean useSynchronizedSession;
8180
private final long leaseLengthMillis;
8281

8382
public static final long UNLIMITED_TIME = 0;
8483

8584
OnlineIndexOperationConfig(int maxLimit, int initialLimit, int maxRetries, int recordsPerSecond, long progressLogIntervalMillis, int increaseLimitAfter,
8685
int maxWriteLimitBytes, long timeLimitMilliseconds, long transactionTimeLimitMilliseconds,
87-
boolean useSynchronizedSession, long leaseLengthMillis) {
86+
long leaseLengthMillis) {
8887
this.maxLimit = maxLimit;
8988
this.initialLimit = initialLimit;
9089
this.maxRetries = maxRetries;
@@ -94,7 +93,6 @@ public class OnlineIndexOperationConfig {
9493
this.maxWriteLimitBytes = maxWriteLimitBytes;
9594
this.timeLimitMilliseconds = timeLimitMilliseconds;
9695
this.transactionTimeLimitMilliseconds = transactionTimeLimitMilliseconds;
97-
this.useSynchronizedSession = useSynchronizedSession;
9896
this.leaseLengthMillis = leaseLengthMillis;
9997
}
10098

@@ -188,10 +186,6 @@ public static Builder newBuilder() {
188186
return new Builder();
189187
}
190188

191-
public boolean shouldUseSynchronizedSession() {
192-
return useSynchronizedSession;
193-
}
194-
195189
public long getLeaseLengthMillis() {
196190
return leaseLengthMillis;
197191
}
@@ -213,7 +207,6 @@ public Builder toBuilder() {
213207
.setMaxRetries(this.maxRetries)
214208
.setTimeLimitMilliseconds(timeLimitMilliseconds)
215209
.setTransactionTimeLimitMilliseconds(this.transactionTimeLimitMilliseconds)
216-
.setUseSynchronizedSession(useSynchronizedSession)
217210
.setLeaseLengthMillis(leaseLengthMillis);
218211
}
219212

@@ -234,7 +227,6 @@ public static class Builder {
234227
private long timeLimitMilliseconds = UNLIMITED_TIME;
235228
private long transactionTimeLimitMilliseconds = DEFAULT_TRANSACTION_TIME_LIMIT;
236229
private long leaseLengthMillis = DEFAULT_LEASE_LENGTH_MILLIS;
237-
private boolean useSynchronizedSession = true;
238230

239231
protected Builder() {
240232

@@ -492,15 +484,18 @@ public Builder setTransactionTimeLimitMilliseconds(long timeLimitMilliseconds) {
492484
* @see SynchronizedSessionRunner
493485
* @param useSynchronizedSession use synchronize session if true, otherwise false
494486
* @return this builder
487+
*
488+
* @deprecated Synchronized sessions are now determined by the indexing method.
495489
*/
490+
@API(API.Status.DEPRECATED)
491+
@Deprecated(since = "4.4.3.0", forRemoval = true)
496492
public Builder setUseSynchronizedSession(boolean useSynchronizedSession) {
497-
this.useSynchronizedSession = useSynchronizedSession;
493+
// no-op
498494
return this;
499495
}
500496

501497
/**
502498
* Set the lease length in milliseconds if the synchronized session is used. By default this is {@link #DEFAULT_LEASE_LENGTH_MILLIS}.
503-
* @see #setUseSynchronizedSession(boolean)
504499
* @see com.apple.foundationdb.synchronizedsession.SynchronizedSession
505500
* @param leaseLengthMillis length between last access and lease's end time in milliseconds
506501
* @return this builder
@@ -519,7 +514,7 @@ public Builder setLeaseLengthMillis(long leaseLengthMillis) {
519514
public OnlineIndexOperationConfig build() {
520515
return new OnlineIndexOperationConfig(maxLimit, initialLimit, maxRetries, recordsPerSecond, progressLogIntervalMillis, increaseLimitAfter,
521516
maxWriteLimitBytes, timeLimitMilliseconds, transactionTimeLimitMilliseconds,
522-
useSynchronizedSession, leaseLengthMillis);
517+
leaseLengthMillis);
523518
}
524519
}
525520
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexScrubber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private IndexingBase getScrubber(IndexScrubbingTools.ScrubbingType type, AtomicL
7575
@Nonnull
7676
private CompletableFuture<Void> scrubIndexAsync(IndexScrubbingTools.ScrubbingType type, AtomicLong count) {
7777
return AsyncUtil.composeHandle(
78-
getScrubber(type, count).buildIndexAsync(false, common.config.shouldUseSynchronizedSession()),
78+
getScrubber(type, count).buildIndexAsync(false),
7979
(ignore, ex) -> {
8080
if (ex != null) {
8181
throw FDBExceptions.wrapException(ex);

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -501,11 +501,6 @@ public static CompletableFuture<Boolean> checkAnyOngoingOnlineIndexBuildsAsync(@
501501
* any retriable errors that it encounters while it runs the build. At the end, it marks the index readable in the
502502
* store.
503503
* </p>
504-
* <p>
505-
* One may consider to set the index state precondition to {@link IndexStatePrecondition#ERROR_IF_DISABLED_CONTINUE_IF_WRITE_ONLY}
506-
* and {@link OnlineIndexer.Builder#setUseSynchronizedSession(boolean)} to {@code false}, which makes the indexer
507-
* follow the same behavior as before version 2.8.90.0. But it is not recommended.
508-
* </p>
509504
* @return a future that will be ready when the build has completed
510505
* @throws com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException the build is stopped
511506
* because there may be another build running actively on this index.
@@ -518,8 +513,7 @@ public CompletableFuture<Void> buildIndexAsync() {
518513
@VisibleForTesting
519514
@Nonnull
520515
CompletableFuture<Void> buildIndexAsync(boolean markReadable) {
521-
boolean useSyncLock = (!indexingPolicy.isMutual() || fallbackToRecordsScan) && common.config.shouldUseSynchronizedSession();
522-
return indexingLauncher(() -> getIndexer().buildIndexAsync(markReadable, useSyncLock));
516+
return indexingLauncher(() -> getIndexer().buildIndexAsync(markReadable));
523517
}
524518

525519
/**
@@ -763,11 +757,6 @@ public Builder setRecordTypes(@Nullable Collection<RecordType> recordTypes) {
763757
* Set how should {@link #buildIndexAsync()} (or its variations) build the index based on its state. Normally
764758
* this should be {@link IndexStatePrecondition#BUILD_IF_DISABLED_CONTINUE_BUILD_IF_WRITE_ONLY} if the index is
765759
* not corrupted.
766-
* <p>
767-
* One may consider setting it to {@link IndexStatePrecondition#ERROR_IF_DISABLED_CONTINUE_IF_WRITE_ONLY} and
768-
* {@link #setUseSynchronizedSession(boolean)} to {@code false}, which makes the indexer follow the same behavior
769-
* as before version 2.8.90.0. But it is not recommended.
770-
* </p>
771760
* @see IndexStatePrecondition
772761
* @param indexStatePrecondition build option to use
773762
* @return this builder
@@ -1449,7 +1438,6 @@ public Builder setMutualIndexing() {
14491438
* by other threads/processes/systems with the exact same parameters, are attempting to concurrently build this
14501439
* index. To allow that, the indexer will:
14511440
* <ol>
1452-
* <li> Avoid the indexing lock - i.e. assume that {@link OnlineIndexer.Builder#setUseSynchronizedSession(boolean)} was called with false</li>
14531441
* <li> Divide the records space to fragments, then iterate the fragments in a way that minimize the interference, while
14541442
* indexing each fragment independently.</li>
14551443
* <li> Handle indexing conflicts, when occurred.</li>

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerBuildIndexTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ <M extends Message> void singleRebuild(
180180
if (!safeBuild) {
181181
indexingPolicy.setIfDisabled(OnlineIndexer.IndexingPolicy.DesiredAction.ERROR)
182182
.setIfMismatchPrevious(OnlineIndexer.IndexingPolicy.DesiredAction.ERROR);
183-
builder.setUseSynchronizedSession(false);
184183
}
185184
if (sourceIndex != null) {
186185
indexingPolicy.setSourceIndex(sourceIndex.getName())

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerIndexFromIndexTest.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import com.apple.foundationdb.record.IndexBuildProto;
2424
import com.apple.foundationdb.record.RecordCoreException;
2525
import com.apple.foundationdb.record.TestRecords1Proto;
26-
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
27-
import com.apple.foundationdb.record.logging.LogMessageKeys;
2826
import com.apple.foundationdb.record.metadata.Index;
2927
import com.apple.foundationdb.record.metadata.IndexOptions;
3028
import com.apple.foundationdb.record.metadata.IndexTypes;
@@ -1158,45 +1156,4 @@ void testIndexFromIndexBlock() {
11581156
assertReadable(tgtIndex);
11591157
scrubAndValidate(List.of(tgtIndex));
11601158
}
1161-
1162-
@Test
1163-
void testIndexFromIndexIgnoreSyncLock() {
1164-
1165-
final long numRecords = 180;
1166-
1167-
Index srcIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS);
1168-
Index tgtIndex = new Index("tgt_index", field("num_value_3_indexed"), IndexTypes.VALUE);
1169-
FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(srcIndex, tgtIndex);
1170-
1171-
populateData(numRecords);
1172-
1173-
openSimpleMetaData(hook);
1174-
buildIndexClean(srcIndex);
1175-
disableAll(List.of(tgtIndex));
1176-
1177-
openSimpleMetaData(hook);
1178-
1179-
IntStream.rangeClosed(0, 4).parallel().forEach(id -> {
1180-
snooze(100 - id);
1181-
try {
1182-
try (OnlineIndexer indexBuilder = newIndexerBuilder(tgtIndex)
1183-
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
1184-
.setSourceIndex("src_index")
1185-
.forbidRecordScan())
1186-
.setLimit(5)
1187-
.setUseSynchronizedSession(id == 0)
1188-
.setMaxRetries(100) // enough to avoid giving up
1189-
.build()) {
1190-
indexBuilder.buildIndex(true);
1191-
}
1192-
} catch (IndexingBase.UnexpectedReadableException ex) {
1193-
LOGGER.info(KeyValueLogMessage.of("Ignoring lock, got exception",
1194-
LogMessageKeys.SESSION_ID, id,
1195-
LogMessageKeys.ERROR, ex.getMessage()));
1196-
}
1197-
});
1198-
1199-
assertReadable(List.of(tgtIndex));
1200-
scrubAndValidate(List.of(tgtIndex));
1201-
}
12021159
}

0 commit comments

Comments
 (0)