Skip to content

Commit 6d2a95a

Browse files
committed
Online Indexer: replace the synchronized runner with a heartbeat
Each indexing session, for each index, will create a key-value heartbeat of the format: [prefix, xid] -> [indexing-type, genesis time, heartbeat time] Indexing session that are expected to be exclusive will throw an exception if another, active, session exists. Motivation: 1. Represent the heartbeat in every index during multi target indexing (currently - only the master index has a sync lock) 2. Keep a heartbeat during mutual indexing, which can allow better automatic decision making 3. Decide about exclusiveness according to the indexing method (currently - user input) Resolve #3529
1 parent 66560f3 commit 6d2a95a

File tree

7 files changed

+199
-119
lines changed

7 files changed

+199
-119
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 46 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,10 @@
4545
import com.apple.foundationdb.record.provider.common.StoreTimer;
4646
import com.apple.foundationdb.record.provider.common.StoreTimerSnapshot;
4747
import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet;
48-
import com.apple.foundationdb.record.provider.foundationdb.synchronizedsession.SynchronizedSessionRunner;
4948
import com.apple.foundationdb.record.query.plan.RecordQueryPlanner;
5049
import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordFromStoredRecordPlan;
5150
import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordPlanner;
5251
import com.apple.foundationdb.subspace.Subspace;
53-
import com.apple.foundationdb.synchronizedsession.SynchronizedSession;
54-
import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException;
5552
import com.apple.foundationdb.tuple.ByteArrayUtil2;
5653
import com.apple.foundationdb.tuple.Tuple;
5754
import com.google.protobuf.Message;
@@ -81,7 +78,6 @@
8178
import java.util.concurrent.atomic.AtomicReference;
8279
import java.util.function.BiFunction;
8380
import java.util.function.Function;
84-
import java.util.function.Supplier;
8581
import java.util.stream.Collectors;
8682

8783
/**
@@ -106,6 +102,7 @@ public abstract class IndexingBase {
106102
private final long startingTimeMillis;
107103
private long lastTypeStampCheckMillis;
108104
private Map<String, IndexingMerger> indexingMergerMap = null;
105+
private IndexingHeartbeat heartbeat = null; // this will stay null for index scrubbing
109106

110107
IndexingBase(@Nonnull IndexingCommon common,
111108
@Nonnull OnlineIndexer.IndexingPolicy policy) {
@@ -157,28 +154,13 @@ public CompletableFuture<Void> buildIndexAsync(boolean markReadable, boolean use
157154
KeyValueLogMessage message = KeyValueLogMessage.build("build index online",
158155
LogMessageKeys.SHOULD_MARK_READABLE, markReadable);
159156
long startNanos = System.nanoTime();
160-
final CompletableFuture<Void> buildIndexAsyncFuture;
161157
FDBDatabaseRunner runner = getRunner();
162-
Index index = common.getPrimaryIndex();
163-
if (runner.getTimer() != null) {
164-
lastProgressSnapshot = StoreTimerSnapshot.from(runner.getTimer());
158+
final FDBStoreTimer timer = runner.getTimer();
159+
if ( timer != null) {
160+
lastProgressSnapshot = StoreTimerSnapshot.from(timer);
165161
}
166-
if (useSyncLock) {
167-
buildIndexAsyncFuture = runner
168-
.runAsync(context -> openRecordStore(context).thenApply(store -> IndexingSubspaces.indexBuildLockSubspace(store, index)),
169-
common.indexLogMessageKeyValues("IndexingBase::indexBuildLockSubspace"))
170-
.thenCompose(lockSubspace -> runner.startSynchronizedSessionAsync(lockSubspace, common.config.getLeaseLengthMillis()))
171-
.thenCompose(synchronizedRunner -> {
172-
message.addKeyAndValue(LogMessageKeys.SESSION_ID, synchronizedRunner.getSessionId());
173-
return runWithSynchronizedRunnerAndEndSession(synchronizedRunner,
174-
() -> handleStateAndDoBuildIndexAsync(markReadable, message));
175-
});
176-
} else {
177-
message.addKeyAndValue(LogMessageKeys.SESSION_ID, "none");
178-
common.setSynchronizedSessionRunner(null);
179-
buildIndexAsyncFuture = handleStateAndDoBuildIndexAsync(markReadable, message);
180-
}
181-
return buildIndexAsyncFuture.whenComplete((vignore, ex) -> {
162+
message.addKeyAndValue(LogMessageKeys.SESSION_ID, common.getUuid());
163+
return handleStateAndDoBuildIndexAsync(markReadable, message).whenComplete((vignore, ex) -> {
182164
message.addKeysAndValues(indexingLogMessageKeyValues()) // add these here to pick up state accumulated during build
183165
.addKeysAndValues(common.indexLogMessageKeyValues())
184166
.addKeyAndValue(LogMessageKeys.TOTAL_MICROS, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos));
@@ -193,36 +175,6 @@ public CompletableFuture<Void> buildIndexAsync(boolean markReadable, boolean use
193175
});
194176
}
195177

196-
@SuppressWarnings("PMD.CloseResource")
197-
private <T> CompletableFuture<T> runWithSynchronizedRunnerAndEndSession(
198-
@Nonnull SynchronizedSessionRunner newSynchronizedRunner, @Nonnull Supplier<CompletableFuture<T>> runnable) {
199-
final SynchronizedSessionRunner currentSynchronizedRunner1 = common.getSynchronizedSessionRunner();
200-
if (currentSynchronizedRunner1 == null) {
201-
common.setSynchronizedSessionRunner(newSynchronizedRunner);
202-
return MoreAsyncUtil.composeWhenComplete(runnable.get(), (result, ex) -> {
203-
final SynchronizedSessionRunner currentSynchronizedRunner2 = common.getSynchronizedSessionRunner();
204-
if (newSynchronizedRunner.equals(currentSynchronizedRunner2)) {
205-
common.setSynchronizedSessionRunner(null);
206-
} else {
207-
if (LOGGER.isWarnEnabled()) {
208-
LOGGER.warn(KeyValueLogMessage.build("synchronizedSessionRunner was modified during the run",
209-
LogMessageKeys.SESSION_ID, newSynchronizedRunner.getSessionId(),
210-
LogMessageKeys.INDEXER_SESSION_ID, currentSynchronizedRunner2 == null ? null : currentSynchronizedRunner2.getSessionId())
211-
.addKeysAndValues(common.indexLogMessageKeyValues())
212-
.toString());
213-
}
214-
}
215-
return newSynchronizedRunner.endSessionAsync();
216-
}, getRunner().getDatabase()::mapAsyncToSyncException);
217-
} else {
218-
return newSynchronizedRunner.endSessionAsync().thenApply(vignore -> {
219-
throw new RecordCoreException("another synchronized session is running on the indexer",
220-
LogMessageKeys.SESSION_ID, newSynchronizedRunner.getSessionId(),
221-
LogMessageKeys.INDEXER_SESSION_ID, currentSynchronizedRunner1.getSessionId());
222-
});
223-
}
224-
}
225-
226178
abstract List<Object> indexingLogMessageKeyValues();
227179

228180
@Nonnull
@@ -314,7 +266,7 @@ private CompletableFuture<Void> markIndexesWriteOnly(boolean continueBuild, FDBR
314266
@Nonnull
315267
public CompletableFuture<Boolean> markReadableIfBuilt() {
316268
AtomicBoolean allReadable = new AtomicBoolean(true);
317-
return common.getNonSynchronizedRunner().runAsync(context -> openRecordStore(context).thenCompose(store ->
269+
return common.getRunner().runAsync(context -> openRecordStore(context).thenCompose(store ->
318270
forEachTargetIndex(index -> {
319271
if (store.isIndexReadable(index)) {
320272
return AsyncUtil.DONE;
@@ -335,6 +287,7 @@ public CompletableFuture<Boolean> markReadableIfBuilt() {
335287
).thenApply(ignore -> allReadable.get()), common.indexLogMessageKeyValues("IndexingBase::markReadableIfBuilt"));
336288
}
337289

290+
338291
@Nonnull
339292
public CompletableFuture<Boolean> markIndexReadable(boolean markReadablePlease) {
340293
if (!markReadablePlease) {
@@ -360,12 +313,16 @@ public CompletableFuture<Boolean> markIndexReadable(boolean markReadablePlease)
360313
private CompletableFuture<Boolean> markIndexReadableSingleTarget(Index index, AtomicBoolean anythingChanged,
361314
AtomicReference<RuntimeException> runtimeExceptionAtomicReference) {
362315
// An extension function to reduce markIndexReadable's complexity
363-
return common.getNonSynchronizedRunner().runAsync(context ->
316+
return common.getRunner().runAsync(context ->
364317
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
365-
.thenCompose(store ->
366-
policy.shouldAllowUniquePendingState(store) ?
367-
store.markIndexReadableOrUniquePending(index) :
368-
store.markIndexReadable(index))
318+
.thenCompose(store -> {
319+
if (heartbeat != null) {
320+
heartbeat.clearHeartbeat(store, index);
321+
}
322+
return policy.shouldAllowUniquePendingState(store) ?
323+
store.markIndexReadableOrUniquePending(index) :
324+
store.markIndexReadable(index);
325+
})
369326
).handle((changed, ex) -> {
370327
if (ex == null) {
371328
if (Boolean.TRUE.equals(changed)) {
@@ -388,6 +345,7 @@ public void enforceStampOverwrite() {
388345
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) {
389346
// continuedBuild is set if this session isn't a continuation of a previous indexing
390347
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store);
348+
heartbeat = new IndexingHeartbeat(common.getUuid(), indexingTypeStamp.getMethod());
391349

392350
return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp));
393351
}
@@ -428,21 +386,6 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
428386
}
429387
// Here: check if type conversion is allowed
430388
if (continuedBuild && shouldAllowTypeConversionContinue(newStamp, savedStamp)) {
431-
// Special case: partly built by another indexing method, but may be continued with the current one
432-
if (savedStamp.getMethod().equals(IndexBuildProto.IndexBuildIndexingStamp.Method.MULTI_TARGET_BY_RECORDS)) {
433-
// Here: throw an exception if there is an active multi-target session that includes this index
434-
final String otherPrimaryIndexName = savedStamp.getTargetIndex(0);
435-
if (!otherPrimaryIndexName.equals(common.getPrimaryIndex().getName())) {
436-
// Note: For protection, avoid breaking an active multi-target session. This leads to a certain
437-
// inconsistency for buildIndex that is called with a false `useSyncLock` - sync lock will be
438-
// checked during a method conversion, but not during a simple "same method" continue.
439-
return throwIfSyncedLock(otherPrimaryIndexName, store, newStamp, savedStamp)
440-
.thenCompose(ignore -> {
441-
store.saveIndexingTypeStamp(index, newStamp);
442-
return AsyncUtil.DONE;
443-
});
444-
}
445-
}
446389
store.saveIndexingTypeStamp(index, newStamp);
447390
return AsyncUtil.DONE;
448391
}
@@ -476,23 +419,6 @@ private static IndexBuildProto.IndexBuildIndexingStamp blocklessStampOf(IndexBui
476419
.build();
477420
}
478421

479-
CompletableFuture<Void> throwIfSyncedLock(String otherIndexName, FDBRecordStore store, IndexBuildProto.IndexBuildIndexingStamp newStamp, IndexBuildProto.IndexBuildIndexingStamp savedStamp) {
480-
final Index otherIndex = store.getRecordMetaData().getIndex(otherIndexName);
481-
final Subspace mainLockSubspace = IndexingSubspaces.indexBuildLockSubspace(store, otherIndex);
482-
return SynchronizedSession.checkActiveSessionExists(store.ensureContextActive(), mainLockSubspace)
483-
.thenApply(hasActiveSession -> {
484-
if (Boolean.TRUE.equals(hasActiveSession)) {
485-
throw new SynchronizedSessionLockedException("Failed to takeover indexing while part of a multi-target with an existing session in progress")
486-
.addLogInfo(LogMessageKeys.SUBSPACE, mainLockSubspace)
487-
.addLogInfo(LogMessageKeys.PRIMARY_INDEX, otherIndexName)
488-
.addLogInfo(LogMessageKeys.EXPECTED, PartlyBuiltException.stampToString(newStamp))
489-
.addLogInfo(LogMessageKeys.ACTUAL, PartlyBuiltException.stampToString(savedStamp));
490-
}
491-
return null;
492-
});
493-
494-
}
495-
496422
@Nonnull
497423
private CompletableFuture<Void> throwAsByRecordsUnlessNoRecordWasScanned(boolean noRecordScanned,
498424
FDBRecordStore store,
@@ -885,21 +811,43 @@ private CompletableFuture<Boolean> hadTransactionReachedLimits(FDBRecordStore st
885811
}
886812

887813
private CompletableFuture<Void> validateTypeStamp(@Nonnull FDBRecordStore store) {
814+
if (shouldValidate()) {
815+
// check other heartbeats (if exclusive) & typestamp
816+
final IndexBuildProto.IndexBuildIndexingStamp expectedTypeStamp = getIndexingTypeStamp(store);
817+
return forEachTargetIndex(index -> CompletableFuture.allOf(
818+
updateHeartbeat(true, store, index),
819+
store.loadIndexingTypeStampAsync(index)
820+
.thenAccept(typeStamp -> validateTypeStamp(typeStamp, expectedTypeStamp, index))
821+
));
822+
} else {
823+
// update only
824+
return forEachTargetIndex(index -> updateHeartbeat(false, store, index));
825+
}
826+
}
827+
828+
private CompletableFuture<Void> updateHeartbeat(boolean validate, FDBRecordStore store, Index index) {
829+
if (heartbeat != null) {
830+
if (validate) {
831+
return heartbeat.checkAndUpdateHeartbeat(store, index);
832+
}
833+
heartbeat.updateHeartbeat(store, index);
834+
}
835+
return AsyncUtil.DONE;
836+
}
837+
838+
private boolean shouldValidate() {
888839
final long minimalInterval = policy.getCheckIndexingMethodFrequencyMilliseconds();
889840
if (minimalInterval < 0 || isScrubber) {
890-
return AsyncUtil.DONE;
841+
return false;
891842
}
892843
if (minimalInterval > 0) {
893844
final long now = System.currentTimeMillis();
894845
if (now < lastTypeStampCheckMillis + minimalInterval) {
895-
return AsyncUtil.DONE;
846+
return false;
896847
}
897848
lastTypeStampCheckMillis = now;
898849
}
899-
final IndexBuildProto.IndexBuildIndexingStamp expectedTypeStamp = getIndexingTypeStamp(store);
900-
return forEachTargetIndex(index ->
901-
store.loadIndexingTypeStampAsync(index)
902-
.thenAccept(typeStamp -> validateTypeStamp(typeStamp, expectedTypeStamp, index)));
850+
return true;
903851
}
904852

905853
private void validateTypeStamp(final IndexBuildProto.IndexBuildIndexingStamp typeStamp,

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingCommon.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.apple.foundationdb.record.metadata.Index;
2828
import com.apple.foundationdb.record.metadata.MetaDataException;
2929
import com.apple.foundationdb.record.metadata.RecordType;
30-
import com.apple.foundationdb.record.provider.foundationdb.synchronizedsession.SynchronizedSessionRunner;
3130
import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordPlanner;
3231
import com.apple.foundationdb.tuple.Tuple;
3332

@@ -50,10 +49,10 @@
5049

5150
@API(API.Status.INTERNAL)
5251
public class IndexingCommon {
52+
// TODO? get uuid from caller to allow lock takeover
5353
private final UUID uuid = UUID.randomUUID();
5454

5555
@Nonnull private final FDBDatabaseRunner runner;
56-
@Nullable private SynchronizedSessionRunner synchronizedSessionRunner = null;
5756

5857
@Nonnull private final FDBRecordStore.Builder recordStoreBuilder;
5958
@Nonnull private final AtomicLong totalRecordsScanned;
@@ -176,11 +175,6 @@ private void logIf(boolean condition, List<Object> list, @Nonnull Object... a) {
176175

177176
@Nonnull
178177
public FDBDatabaseRunner getRunner() {
179-
return synchronizedSessionRunner == null ? runner : synchronizedSessionRunner;
180-
}
181-
182-
@Nonnull
183-
public FDBDatabaseRunner getNonSynchronizedRunner() {
184178
return runner;
185179
}
186180

@@ -258,15 +252,6 @@ public FDBRecordStore.Builder getRecordStoreBuilder() {
258252
return recordStoreBuilder;
259253
}
260254

261-
@Nullable
262-
public SynchronizedSessionRunner getSynchronizedSessionRunner() {
263-
return synchronizedSessionRunner;
264-
}
265-
266-
public void setSynchronizedSessionRunner(@Nullable final SynchronizedSessionRunner synchronizedSessionRunner) {
267-
this.synchronizedSessionRunner = synchronizedSessionRunner;
268-
}
269-
270255
@Nonnull
271256
public AtomicLong getTotalRecordsScanned() {
272257
return totalRecordsScanned;
@@ -287,8 +272,5 @@ public boolean loadConfig() {
287272

288273
public void close() {
289274
runner.close();
290-
if (synchronizedSessionRunner != null) {
291-
synchronizedSessionRunner.close();
292-
}
293275
}
294276
}

0 commit comments

Comments
 (0)