diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java index 78a8987220..090cce3088 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java @@ -58,6 +58,7 @@ public enum LogMessageKeys { TRANSACTION_ID, TRANSACTION_NAME, AGE_SECONDS, + AGE_MILLISECONDS, CONSTITUENT, TOTAL_MICROS, // record splitting/unsplitting @@ -162,7 +163,7 @@ public enum LogMessageKeys { RECORDS_PER_SECOND, DOCUMENT, SESSION_ID, - INDEXER_SESSION_ID, + EXISTING_INDEXER_ID, INDEXER_ID, INDEX_STATE_PRECONDITION, INITIAL_INDEX_STATE, diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java index 23dd1f17f6..156b0e5377 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java @@ -4966,6 +4966,7 @@ public void removeFormerIndex(FormerIndex formerIndex) { private void clearReadableIndexBuildData(Index index) { IndexingRangeSet.forIndexBuild(this, index).clear(); + IndexingHeartbeat.clearAllHeartbeats(this, index); } @SuppressWarnings("PMD.CloseResource") diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBStoreTimer.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBStoreTimer.java index 07d1af133d..29bf3a5b76 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBStoreTimer.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBStoreTimer.java @@ -413,6 +413,10 @@ public enum Waits implements Wait { WAIT_INDEX_OPERATION("wait for index operation"), /** Wait for indexing type stamp operation. */ WAIT_INDEX_TYPESTAMP_OPERATION("wait for indexing type stamp operation"), + /** Wait for clearing indexing heartbeats. */ + WAIT_INDEX_CLEAR_HEARTBEATS("Wait for clearing indexing heartbeats"), + /** Wait for reading indexing heartbeats. */ + WAIT_INDEX_READ_HEARTBEATS("Wait for reading indexing heartbeats"), /** Wait for adding an index. */ WAIT_ADD_INDEX("wait for adding an index"), /** Wait for dropping an index. */ diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java index 4371c1d230..adc29115ca 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java @@ -45,13 +45,10 @@ import com.apple.foundationdb.record.provider.common.StoreTimer; import com.apple.foundationdb.record.provider.common.StoreTimerSnapshot; import com.apple.foundationdb.record.provider.foundationdb.indexing.IndexingRangeSet; -import com.apple.foundationdb.record.provider.foundationdb.synchronizedsession.SynchronizedSessionRunner; import com.apple.foundationdb.record.query.plan.RecordQueryPlanner; import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordFromStoredRecordPlan; import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordPlanner; import com.apple.foundationdb.subspace.Subspace; -import com.apple.foundationdb.synchronizedsession.SynchronizedSession; -import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException; import com.apple.foundationdb.tuple.ByteArrayUtil2; import com.apple.foundationdb.tuple.Tuple; import com.google.protobuf.Message; @@ -81,7 +78,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -106,6 +102,7 @@ public abstract class IndexingBase { private final long startingTimeMillis; private long lastTypeStampCheckMillis; private Map indexingMergerMap = null; + private IndexingHeartbeat heartbeat = null; // this will stay null for index scrubbing IndexingBase(@Nonnull IndexingCommon common, @Nonnull OnlineIndexer.IndexingPolicy policy) { @@ -153,74 +150,47 @@ protected CompletableFuture> recordIfInIndexedTypes(FDB // buildIndexAsync - the main indexing function. Builds and commits indexes asynchronously; throttling to avoid overloading the system. @SuppressWarnings("PMD.CloseResource") - public CompletableFuture buildIndexAsync(boolean markReadable, boolean useSyncLock) { + public CompletableFuture buildIndexAsync(boolean markReadable) { KeyValueLogMessage message = KeyValueLogMessage.build("build index online", - LogMessageKeys.SHOULD_MARK_READABLE, markReadable); + LogMessageKeys.SHOULD_MARK_READABLE, markReadable, + LogMessageKeys.INDEXER_ID, common.getIndexerId()); long startNanos = System.nanoTime(); - final CompletableFuture buildIndexAsyncFuture; FDBDatabaseRunner runner = getRunner(); - Index index = common.getPrimaryIndex(); - if (runner.getTimer() != null) { - lastProgressSnapshot = StoreTimerSnapshot.from(runner.getTimer()); + final FDBStoreTimer timer = runner.getTimer(); + if ( timer != null) { + lastProgressSnapshot = StoreTimerSnapshot.from(timer); } - if (useSyncLock) { - buildIndexAsyncFuture = runner - .runAsync(context -> openRecordStore(context).thenApply(store -> IndexingSubspaces.indexBuildLockSubspace(store, index)), - common.indexLogMessageKeyValues("IndexingBase::indexBuildLockSubspace")) - .thenCompose(lockSubspace -> runner.startSynchronizedSessionAsync(lockSubspace, common.config.getLeaseLengthMillis())) - .thenCompose(synchronizedRunner -> { - message.addKeyAndValue(LogMessageKeys.SESSION_ID, synchronizedRunner.getSessionId()); - return runWithSynchronizedRunnerAndEndSession(synchronizedRunner, - () -> handleStateAndDoBuildIndexAsync(markReadable, message)); - }); - } else { - message.addKeyAndValue(LogMessageKeys.SESSION_ID, "none"); - common.setSynchronizedSessionRunner(null); - buildIndexAsyncFuture = handleStateAndDoBuildIndexAsync(markReadable, message); - } - return buildIndexAsyncFuture.whenComplete((vignore, ex) -> { - message.addKeysAndValues(indexingLogMessageKeyValues()) // add these here to pick up state accumulated during build - .addKeysAndValues(common.indexLogMessageKeyValues()) - .addKeyAndValue(LogMessageKeys.TOTAL_MICROS, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos)); - if (LOGGER.isWarnEnabled() && (ex != null)) { - message.addKeyAndValue(LogMessageKeys.RESULT, "failure"); - message.addKeysAndValues(throttle.logMessageKeyValues()); // this "last attempt" snapshot information can help debugging - LOGGER.warn(message.toString(), ex); - } else if (LOGGER.isInfoEnabled()) { - message.addKeyAndValue(LogMessageKeys.RESULT, "success"); - LOGGER.info(message.toString()); - } - }); - } - - @SuppressWarnings("PMD.CloseResource") - private CompletableFuture runWithSynchronizedRunnerAndEndSession( - @Nonnull SynchronizedSessionRunner newSynchronizedRunner, @Nonnull Supplier> runnable) { - final SynchronizedSessionRunner currentSynchronizedRunner1 = common.getSynchronizedSessionRunner(); - if (currentSynchronizedRunner1 == null) { - common.setSynchronizedSessionRunner(newSynchronizedRunner); - return MoreAsyncUtil.composeWhenComplete(runnable.get(), (result, ex) -> { - final SynchronizedSessionRunner currentSynchronizedRunner2 = common.getSynchronizedSessionRunner(); - if (newSynchronizedRunner.equals(currentSynchronizedRunner2)) { - common.setSynchronizedSessionRunner(null); - } else { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn(KeyValueLogMessage.build("synchronizedSessionRunner was modified during the run", - LogMessageKeys.SESSION_ID, newSynchronizedRunner.getSessionId(), - LogMessageKeys.INDEXER_SESSION_ID, currentSynchronizedRunner2 == null ? null : currentSynchronizedRunner2.getSessionId()) - .addKeysAndValues(common.indexLogMessageKeyValues()) - .toString()); + AtomicReference indexingException = new AtomicReference<>(null); + return handleStateAndDoBuildIndexAsync(markReadable, message) + .handle((ret, ex) -> { + if (ex != null) { + indexingException.set(ex); } - } - return newSynchronizedRunner.endSessionAsync(); - }, getRunner().getDatabase()::mapAsyncToSyncException); - } else { - return newSynchronizedRunner.endSessionAsync().thenApply(vignore -> { - throw new RecordCoreException("another synchronized session is running on the indexer", - LogMessageKeys.SESSION_ID, newSynchronizedRunner.getSessionId(), - LogMessageKeys.INDEXER_SESSION_ID, currentSynchronizedRunner1.getSessionId()); - }); - } + message.addKeysAndValues(indexingLogMessageKeyValues()) // add these here to pick up state accumulated during build + .addKeysAndValues(common.indexLogMessageKeyValues()) + .addKeyAndValue(LogMessageKeys.TOTAL_MICROS, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos)); + if (LOGGER.isWarnEnabled() && (ex != null)) { + message.addKeyAndValue(LogMessageKeys.RESULT, "failure"); + message.addKeysAndValues(throttle.logMessageKeyValues()); // this "last attempt" snapshot information can help debugging + LOGGER.warn(message.toString(), ex); + } else if (LOGGER.isInfoEnabled()) { + message.addKeyAndValue(LogMessageKeys.RESULT, "success"); + LOGGER.info(message.toString()); + } + return ret; + }) + // Here: if the heartbeat was *not* cleared while marking the index readable, it would be cleared in + // these dedicated transaction. Heartbeat clearing is not a blocker but a "best effort" operation. + .thenCompose(ignore -> clearHeartbeats()) + .handle((ignore, exIgnore) -> { + Throwable ex = indexingException.get(); + if (ex instanceof RuntimeException) { + throw (RuntimeException) ex; + } else if (ex != null) { + throw new RuntimeException(ex); + } + return null; + }); } abstract List indexingLogMessageKeyValues(); @@ -314,7 +284,7 @@ private CompletableFuture markIndexesWriteOnly(boolean continueBuild, FDBR @Nonnull public CompletableFuture markReadableIfBuilt() { AtomicBoolean allReadable = new AtomicBoolean(true); - return common.getNonSynchronizedRunner().runAsync(context -> openRecordStore(context).thenCompose(store -> + return getRunner().runAsync(context -> openRecordStore(context).thenCompose(store -> forEachTargetIndex(index -> { if (store.isIndexReadable(index)) { return AsyncUtil.DONE; @@ -353,6 +323,7 @@ public CompletableFuture markIndexReadable(boolean markReadablePlease) if (ex != null) { throw ex; } + heartbeat = null; // Here: heartbeats had been successfully cleared. No need to clear again return anythingChanged.get(); }); } @@ -360,12 +331,14 @@ public CompletableFuture markIndexReadable(boolean markReadablePlease) private CompletableFuture markIndexReadableSingleTarget(Index index, AtomicBoolean anythingChanged, AtomicReference runtimeExceptionAtomicReference) { // An extension function to reduce markIndexReadable's complexity - return common.getNonSynchronizedRunner().runAsync(context -> + return getRunner().runAsync(context -> common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync() - .thenCompose(store -> - policy.shouldAllowUniquePendingState(store) ? - store.markIndexReadableOrUniquePending(index) : - store.markIndexReadable(index)) + .thenCompose(store -> { + clearHeartbeatSingleTarget(store, index); + return policy.shouldAllowUniquePendingState(store) ? + store.markIndexReadableOrUniquePending(index) : + store.markIndexReadable(index); + }) ).handle((changed, ex) -> { if (ex == null) { if (Boolean.TRUE.equals(changed)) { @@ -388,8 +361,14 @@ public void enforceStampOverwrite() { private CompletableFuture setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) { // continuedBuild is set if this session isn't a continuation of a previous indexing IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store); + final IndexBuildProto.IndexBuildIndexingStamp.Method method = indexingTypeStamp.getMethod(); + boolean allowMutual = + method == IndexBuildProto.IndexBuildIndexingStamp.Method.MUTUAL_BY_RECORDS || + method == IndexBuildProto.IndexBuildIndexingStamp.Method.SCRUB_REPAIR; + heartbeat = new IndexingHeartbeat(common.getIndexerId(), indexingTypeStamp.getMethod().toString(), common.config.getLeaseLengthMillis(), allowMutual); - return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp)); + return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp) + .thenCompose(ignore -> updateHeartbeat(true, store, index))); } @Nonnull @@ -397,7 +376,7 @@ private CompletableFuture setIndexingTypeOrThrow(FDBRecordStore store, boo if (forceStampOverwrite && !continuedBuild) { // Fresh session + overwrite = no questions asked store.saveIndexingTypeStamp(index, newStamp); - return AsyncUtil.DONE; + return AsyncUtil.DONE ; } return store.loadIndexingTypeStampAsync(index) .thenCompose(savedStamp -> { @@ -428,21 +407,6 @@ private CompletableFuture setIndexingTypeOrThrow(FDBRecordStore store, boo } // Here: check if type conversion is allowed if (continuedBuild && shouldAllowTypeConversionContinue(newStamp, savedStamp)) { - // Special case: partly built by another indexing method, but may be continued with the current one - if (savedStamp.getMethod().equals(IndexBuildProto.IndexBuildIndexingStamp.Method.MULTI_TARGET_BY_RECORDS)) { - // Here: throw an exception if there is an active multi-target session that includes this index - final String otherPrimaryIndexName = savedStamp.getTargetIndex(0); - if (!otherPrimaryIndexName.equals(common.getPrimaryIndex().getName())) { - // Note: For protection, avoid breaking an active multi-target session. This leads to a certain - // inconsistency for buildIndex that is called with a false `useSyncLock` - sync lock will be - // checked during a method conversion, but not during a simple "same method" continue. - return throwIfSyncedLock(otherPrimaryIndexName, store, newStamp, savedStamp) - .thenCompose(ignore -> { - store.saveIndexingTypeStamp(index, newStamp); - return AsyncUtil.DONE; - }); - } - } store.saveIndexingTypeStamp(index, newStamp); return AsyncUtil.DONE; } @@ -476,23 +440,6 @@ private static IndexBuildProto.IndexBuildIndexingStamp blocklessStampOf(IndexBui .build(); } - CompletableFuture throwIfSyncedLock(String otherIndexName, FDBRecordStore store, IndexBuildProto.IndexBuildIndexingStamp newStamp, IndexBuildProto.IndexBuildIndexingStamp savedStamp) { - final Index otherIndex = store.getRecordMetaData().getIndex(otherIndexName); - final Subspace mainLockSubspace = IndexingSubspaces.indexBuildLockSubspace(store, otherIndex); - return SynchronizedSession.checkActiveSessionExists(store.ensureContextActive(), mainLockSubspace) - .thenApply(hasActiveSession -> { - if (Boolean.TRUE.equals(hasActiveSession)) { - throw new SynchronizedSessionLockedException("Failed to takeover indexing while part of a multi-target with an existing session in progress") - .addLogInfo(LogMessageKeys.SUBSPACE, mainLockSubspace) - .addLogInfo(LogMessageKeys.PRIMARY_INDEX, otherIndexName) - .addLogInfo(LogMessageKeys.EXPECTED, PartlyBuiltException.stampToString(newStamp)) - .addLogInfo(LogMessageKeys.ACTUAL, PartlyBuiltException.stampToString(savedStamp)); - } - return null; - }); - - } - @Nonnull private CompletableFuture throwAsByRecordsUnlessNoRecordWasScanned(boolean noRecordScanned, FDBRecordStore store, @@ -562,7 +509,7 @@ RecordCoreException newPartlyBuiltException(boolean continuedBuild, IndexBuildProto.IndexBuildIndexingStamp savedStamp, IndexBuildProto.IndexBuildIndexingStamp expectedStamp, Index index) { - return new PartlyBuiltException(savedStamp, expectedStamp, index, common.getUuid(), + return new PartlyBuiltException(savedStamp, expectedStamp, index, common.getIndexerId(), savedStamp.getBlock() ? "This index was partly built, and blocked" : "This index was partly built by another method"); @@ -595,7 +542,7 @@ protected CompletableFuture doneOrThrottleDelayAndMaybeLogProgress(bool validateTimeLimit(toWait); - CompletableFuture delay = MoreAsyncUtil.delayedFuture(toWait, TimeUnit.MILLISECONDS, common.getRunner().getScheduledExecutor()).thenApply(vignore3 -> true); + CompletableFuture delay = MoreAsyncUtil.delayedFuture(toWait, TimeUnit.MILLISECONDS, getRunner().getScheduledExecutor()).thenApply(vignore3 -> true); if (getRunner().getTimer() != null) { delay = getRunner().getTimer().instrument(FDBStoreTimer.Events.INDEXER_DELAY, delay, getRunner().getExecutor()); } @@ -885,21 +832,75 @@ private CompletableFuture hadTransactionReachedLimits(FDBRecordStore st } private CompletableFuture validateTypeStamp(@Nonnull FDBRecordStore store) { + if (shouldValidate()) { + // check other heartbeats (if exclusive) & typestamp + final IndexBuildProto.IndexBuildIndexingStamp expectedTypeStamp = getIndexingTypeStamp(store); + return forEachTargetIndex(index -> CompletableFuture.allOf( + updateHeartbeat(true, store, index), + store.loadIndexingTypeStampAsync(index) + .thenAccept(typeStamp -> validateTypeStamp(typeStamp, expectedTypeStamp, index)) + )); + } else { + // update only + return forEachTargetIndex(index -> updateHeartbeat(false, store, index)); + } + } + + private CompletableFuture updateHeartbeat(boolean validate, FDBRecordStore store, Index index) { + if (heartbeat != null) { + if (validate) { + return heartbeat.checkAndUpdateHeartbeat(store, index); + } + heartbeat.updateHeartbeat(store, index); + } + return AsyncUtil.DONE; + } + + private CompletableFuture clearHeartbeats() { + if (heartbeat == null) { + return AsyncUtil.DONE; + } + return forEachTargetIndex(this::clearHeartbeatSingleTarget) + .thenAccept(ignore -> heartbeat = null); + } + + private void clearHeartbeats(FDBRecordStore store) { + if (heartbeat != null) { + for (Index index : common.getTargetIndexes()) { + clearHeartbeatSingleTarget(store, index); + } + } + } + + private CompletableFuture clearHeartbeatSingleTarget(Index index) { + return getRunner().runAsync(context -> + common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync() + .thenApply(store -> { + clearHeartbeatSingleTarget(store, index); + return null; + })); + } + + private void clearHeartbeatSingleTarget(FDBRecordStore store, Index index) { + if (heartbeat != null) { + heartbeat.clearHeartbeat(store, index); + } + } + + + private boolean shouldValidate() { final long minimalInterval = policy.getCheckIndexingMethodFrequencyMilliseconds(); if (minimalInterval < 0 || isScrubber) { - return AsyncUtil.DONE; + return false; } if (minimalInterval > 0) { final long now = System.currentTimeMillis(); if (now < lastTypeStampCheckMillis + minimalInterval) { - return AsyncUtil.DONE; + return false; } lastTypeStampCheckMillis = now; } - final IndexBuildProto.IndexBuildIndexingStamp expectedTypeStamp = getIndexingTypeStamp(store); - return forEachTargetIndex(index -> - store.loadIndexingTypeStampAsync(index) - .thenAccept(typeStamp -> validateTypeStamp(typeStamp, expectedTypeStamp, index))); + return true; } private void validateTypeStamp(final IndexBuildProto.IndexBuildIndexingStamp typeStamp, @@ -911,7 +912,7 @@ private void validateTypeStamp(final IndexBuildProto.IndexBuildIndexingStamp typ } if (typeStamp == null || typeStamp.getMethod() != expectedTypeStamp.getMethod() || isTypeStampBlocked(typeStamp)) { throw new PartlyBuiltException(typeStamp, expectedTypeStamp, - index, common.getUuid(), "Indexing stamp had changed"); + index, common.getIndexerId(), "Indexing stamp had changed"); } } @@ -1036,7 +1037,9 @@ public CompletableFuture rebuildIndexAsync(@Nonnull FDBRecordStore store) return rangeSet.insertRangeAsync(null, null); })) .thenCompose(vignore -> setIndexingTypeOrThrow(store, false)) - .thenCompose(vignore -> rebuildIndexInternalAsync(store)); + .thenCompose(vignore -> rebuildIndexInternalAsync(store)) + // If any of the indexes' heartbeats, for any reason, was not cleared during "mark readable", clear it here + .whenComplete((ignore, ignoreEx) -> clearHeartbeats(store)); } abstract CompletableFuture rebuildIndexInternalAsync(FDBRecordStore store); @@ -1046,7 +1049,7 @@ protected void validateOrThrowEx(boolean isValid, @Nonnull String msg) { throw new ValidationException(msg, LogMessageKeys.INDEX_NAME, common.getTargetIndexesNames(), LogMessageKeys.SOURCE_INDEX, policy.getSourceIndex(), - LogMessageKeys.INDEXER_ID, common.getUuid()); + LogMessageKeys.INDEXER_ID, common.getIndexerId()); } } @@ -1107,6 +1110,16 @@ boolean performIndexingStampOperation(@Nonnull ConcurrentHashMap> getIndexingHeartbeats(int maxCount) { + return getRunner().runAsync(context -> openRecordStore(context) + .thenCompose(store -> IndexingHeartbeat.getIndexingHeartbeats(store, common.getPrimaryIndex(), maxCount))); + } + + public CompletableFuture clearIndexingHeartbeats(long minAgenMilliseconds, int maxIteration) { + return getRunner().runAsync(context -> openRecordStore(context) + .thenCompose(store -> IndexingHeartbeat.clearIndexingHeartbeats(store, common.getPrimaryIndex(), minAgenMilliseconds, maxIteration))); + } + /** * Thrown when the indexing process fails to meet a precondition. */ diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingByIndex.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingByIndex.java index e2a85202df..d76916be57 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingByIndex.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingByIndex.java @@ -103,7 +103,7 @@ private Index getSourceIndex(RecordMetaData metaData) { throw new ValidationException("no source index", LogMessageKeys.INDEX_NAME, common.getIndex().getName(), LogMessageKeys.SOURCE_INDEX, policy.getSourceIndex(), - LogMessageKeys.INDEXER_ID, common.getUuid()); + LogMessageKeys.INDEXER_ID, common.getIndexerId()); } @Nonnull diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingCommon.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingCommon.java index 9bf8066786..4b9b44e3fa 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingCommon.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingCommon.java @@ -27,7 +27,6 @@ import com.apple.foundationdb.record.metadata.Index; import com.apple.foundationdb.record.metadata.MetaDataException; import com.apple.foundationdb.record.metadata.RecordType; -import com.apple.foundationdb.record.provider.foundationdb.synchronizedsession.SynchronizedSessionRunner; import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordPlanner; import com.apple.foundationdb.tuple.Tuple; @@ -50,10 +49,9 @@ @API(API.Status.INTERNAL) public class IndexingCommon { - private final UUID uuid = UUID.randomUUID(); + private final UUID indexerId = UUID.randomUUID(); @Nonnull private final FDBDatabaseRunner runner; - @Nullable private SynchronizedSessionRunner synchronizedSessionRunner = null; @Nonnull private final FDBRecordStore.Builder recordStoreBuilder; @Nonnull private final AtomicLong totalRecordsScanned; @@ -137,8 +135,8 @@ private void fillTargetIndexers(@Nonnull List targetIndexes, @Nullable Co } } - public UUID getUuid() { - return uuid; + public UUID getIndexerId() { + return indexerId; } public List indexLogMessageKeyValues() { @@ -158,7 +156,7 @@ public List indexLogMessageKeyValues(@Nullable String transactionName, @ logIf(true, keyValues, LogMessageKeys.TARGET_INDEX_NAME, getTargetIndexesNames(), LogMessageKeys.RECORDS_SCANNED, totalRecordsScanned.get(), - LogMessageKeys.INDEXER_ID, uuid); + LogMessageKeys.INDEXER_ID, indexerId); if (moreKeyValues != null && !moreKeyValues.isEmpty()) { keyValues.addAll(moreKeyValues); @@ -176,11 +174,6 @@ private void logIf(boolean condition, List list, @Nonnull Object... a) { @Nonnull public FDBDatabaseRunner getRunner() { - return synchronizedSessionRunner == null ? runner : synchronizedSessionRunner; - } - - @Nonnull - public FDBDatabaseRunner getNonSynchronizedRunner() { return runner; } @@ -258,15 +251,6 @@ public FDBRecordStore.Builder getRecordStoreBuilder() { return recordStoreBuilder; } - @Nullable - public SynchronizedSessionRunner getSynchronizedSessionRunner() { - return synchronizedSessionRunner; - } - - public void setSynchronizedSessionRunner(@Nullable final SynchronizedSessionRunner synchronizedSessionRunner) { - this.synchronizedSessionRunner = synchronizedSessionRunner; - } - @Nonnull public AtomicLong getTotalRecordsScanned() { return totalRecordsScanned; @@ -287,8 +271,5 @@ public boolean loadConfig() { public void close() { runner.close(); - if (synchronizedSessionRunner != null) { - synchronizedSessionRunner.close(); - } } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java new file mode 100644 index 0000000000..fa0b050fcd --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java @@ -0,0 +1,184 @@ +/* + * IndexingHeartbeat.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb; + +import com.apple.foundationdb.KeyValue; +import com.apple.foundationdb.async.AsyncIterator; +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.record.IndexBuildProto; +import com.apple.foundationdb.record.logging.LogMessageKeys; +import com.apple.foundationdb.record.metadata.Index; +import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException; +import com.google.protobuf.InvalidProtocolBufferException; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +public class IndexingHeartbeat { + // [prefix, indexerId] -> [indexing-type, genesis time, heartbeat time] + final UUID indexerId; + final String info; + final long genesisTimeMilliseconds; + final long leaseLength; + final boolean allowMutual; + + public IndexingHeartbeat(final UUID indexerId, String info, long leaseLength, boolean allowMutual) { + this.indexerId = indexerId; + this.info = info; + this.leaseLength = leaseLength; + this.allowMutual = allowMutual; + this.genesisTimeMilliseconds = nowMilliseconds(); + } + + public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) { + byte[] key = IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack(); + byte[] value = IndexBuildProto.IndexBuildHeartbeat.newBuilder() + .setInfo(info) + .setGenesisTimeMilliseconds(genesisTimeMilliseconds) + .setHeartbeatTimeMilliseconds(nowMilliseconds()) + .build().toByteArray(); + store.ensureContextActive().set(key, value); + } + + public CompletableFuture checkAndUpdateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) { + // complete exceptionally if non-mutual, other exists + if (allowMutual) { + updateHeartbeat(store, index); + return AsyncUtil.DONE; + } + + final AsyncIterator iterator = heartbeatsIterator(store, index); + final long now = nowMilliseconds(); + return AsyncUtil.whileTrue(() -> iterator.onHasNext() + .thenApply(hasNext -> { + if (!hasNext) { + return false; + } + final KeyValue kv = iterator.next(); + try { + final UUID otherIndexerId = heartbeatKeyToIndexerId(store, index, kv.getKey()); + if (!otherIndexerId.equals(this.indexerId)) { + final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue()); + final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds(); + if (age > 0 && age < leaseLength) { + // For practical reasons, this exception is backward compatible to the Synchronized Lock one + throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress") + .addLogInfo(LogMessageKeys.INDEXER_ID, indexerId) + .addLogInfo(LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId) + .addLogInfo(LogMessageKeys.AGE_MILLISECONDS, age) + .addLogInfo(LogMessageKeys.TIME_LIMIT_MILLIS, leaseLength); + } + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return true; + })) + .thenApply(ignore -> { + updateHeartbeat(store, index); + return null; + }); + } + + public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) { + store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack()); + } + + public static void clearAllHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index) { + store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index).range()); + } + + public static CompletableFuture> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) { + final Map ret = new HashMap<>(); + final AsyncIterator iterator = heartbeatsIterator(store, index); + final AtomicInteger iterationCount = new AtomicInteger(0); + return AsyncUtil.whileTrue(() -> iterator.onHasNext() + .thenApply(hasNext -> { + if (!hasNext) { + return false; + } + if (maxCount > 0 && maxCount < iterationCount.incrementAndGet()) { + return false; + } + final KeyValue kv = iterator.next(); + final UUID otherIndexerId = heartbeatKeyToIndexerId(store, index, kv.getKey()); + try { + final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue()); + ret.put(otherIndexerId, otherHeartbeat); + } catch (InvalidProtocolBufferException e) { + // Let the caller know about this invalid heartbeat. + ret.put(otherIndexerId, IndexBuildProto.IndexBuildHeartbeat.newBuilder() + .setInfo("<< Invalid Heartbeat >>") + .build()); + } + return true; + })) + .thenApply(ignore -> ret); + } + + public static CompletableFuture clearIndexingHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index, long minAgenMilliseconds, int maxIteration) { + final AsyncIterator iterator = heartbeatsIterator(store, index); + final AtomicInteger deleteCount = new AtomicInteger(0); + final AtomicInteger iterationCount = new AtomicInteger(0); + final long now = nowMilliseconds(); + return AsyncUtil.whileTrue(() -> iterator.onHasNext() + .thenApply(hasNext -> { + if (!hasNext) { + return false; + } + if (maxIteration > 0 && maxIteration < iterationCount.incrementAndGet()) { + return false; + } + final KeyValue kv = iterator.next(); + boolean shouldRemove; + try { + final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue()); + // remove heartbeat if too old + shouldRemove = now + minAgenMilliseconds >= otherHeartbeat.getHeartbeatTimeMilliseconds(); + } catch (InvalidProtocolBufferException e) { + // remove heartbeat if invalid + shouldRemove = true; + } + if (shouldRemove) { + store.ensureContextActive().clear(kv.getKey()); + deleteCount.incrementAndGet(); + } + return true; + })) + .thenApply(ignore -> deleteCount.get()); + } + + private static AsyncIterator heartbeatsIterator(FDBRecordStore store, Index index) { + return store.getContext().ensureActive().snapshot().getRange(IndexingSubspaces.indexheartbeatSubspace(store, index).range()).iterator(); + } + + private static UUID heartbeatKeyToIndexerId(FDBRecordStore store, Index index, byte[] key) { + return IndexingSubspaces.indexheartbeatSubspace(store, index).unpack(key).getUUID(0); + } + + private static long nowMilliseconds() { + return System.currentTimeMillis(); + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java index 1269541d43..043c70fe20 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java @@ -77,7 +77,7 @@ CompletableFuture mergeIndex(@Nullable SubspaceProvider subspaceProvider) // Merge operation may take a long time, hence the runner's context must be a read-only. Ensure that it // isn't a synchronized one, which may attempt a heartbeat write // Note: this runAsync will retry according to the runner's "maxAttempts" setting - common.getNonSynchronizedRunner().runAsync(context -> openRecordStore(context) + common.getRunner().runAsync(context -> openRecordStore(context) .thenCompose(store -> { mergeStartTime.set(System.nanoTime()); final IndexDeferredMaintenanceControl mergeControl = store.getIndexDeferredMaintenanceControl(); diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingSubspaces.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingSubspaces.java index 10b564ec20..acc7363303 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingSubspaces.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingSubspaces.java @@ -27,6 +27,7 @@ import com.apple.foundationdb.tuple.Tuple; import javax.annotation.Nonnull; +import java.util.UUID; /** * List of subspaces related to the indexing/index-scrubbing processes. @@ -40,6 +41,7 @@ public final class IndexingSubspaces { private static final Object INDEX_SCRUBBED_RECORDS_RANGES_ZERO = 4L; private static final Object INDEX_SCRUBBED_RECORDS_RANGES = 5L; private static final Object INDEX_SCRUBBED_INDEX_RANGES = 6L; + private static final Object INDEX_BUILD_HEARTBEAT_PREFIX = 7L; private IndexingSubspaces() { throw new IllegalStateException("Utility class"); @@ -83,6 +85,29 @@ public static Subspace indexBuildTypeSubspace(@Nonnull FDBRecordStoreBase sto return indexBuildSubspace(store, index, INDEX_BUILD_TYPE_VERSION); } + /** + * Subspace that stores the indexing heartbeat. + * @param store store + * @param index index + * @return subspace + */ + @Nonnull + public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase store, @Nonnull Index index) { + return indexBuildSubspace(store, index, INDEX_BUILD_HEARTBEAT_PREFIX); + } + + /** + * Subspace that stores the indexing heartbeat. + * @param store store + * @param index index + * @param indexerId session id + * @return subspace + */ + @Nonnull + public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase store, @Nonnull Index index, @Nonnull UUID indexerId) { + return indexheartbeatSubspace(store, index).subspace(Tuple.from(indexerId)); + } + /** * Subspace that stores scrubbed records ranges of the zero range-id. This subspace is backward compatible * to record ranges scrubbed before range-id was introduced. @@ -184,5 +209,7 @@ public static void eraseAllIndexingDataButTheLock(@Nonnull FDBRecordContext cont eraseAllIndexingScrubbingData(context, store, index); context.clear(Range.startsWith(indexBuildScannedRecordsSubspace(store, index).pack())); context.clear(Range.startsWith(indexBuildTypeSubspace(store, index).pack())); + // The heartbeats, unlike the sync lock, may be erased here. If needed, an appropriate heartbeat will be set after this clear & within the same transaction. + context.clear(Range.startsWith(indexheartbeatSubspace(store, index).pack())); } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationBaseBuilder.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationBaseBuilder.java index 8d44c0a5df..0f4ed46ae1 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationBaseBuilder.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationBaseBuilder.java @@ -20,6 +20,7 @@ package com.apple.foundationdb.record.provider.foundationdb; +import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.record.RecordCoreException; import com.apple.foundationdb.record.RecordMetaData; import com.apple.foundationdb.record.RecordMetaDataProvider; @@ -715,15 +716,18 @@ public B setTransactionTimeLimitMilliseconds(long timeLimitMilliseconds) { * @see SynchronizedSessionRunner * @param useSynchronizedSession use synchronize session if true, otherwise false * @return this builder + * + * @deprecated Synchronized sessions are now determined by the indexing method. */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public B setUseSynchronizedSession(boolean useSynchronizedSession) { - configBuilder.setUseSynchronizedSession(useSynchronizedSession); return self(); } /** * Set the lease length in milliseconds if the synchronized session is used. The default value is {@link OnlineIndexOperationConfig#DEFAULT_LEASE_LENGTH_MILLIS}. - * @see #setUseSynchronizedSession(boolean) * @see com.apple.foundationdb.synchronizedsession.SynchronizedSession * @param leaseLengthMillis length between last access and lease's end time in milliseconds * @return this builder diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationConfig.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationConfig.java index 1ce11d342b..9a4ce2f5b1 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationConfig.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationConfig.java @@ -77,14 +77,13 @@ public class OnlineIndexOperationConfig { private final int increaseLimitAfter; private final long timeLimitMilliseconds; private final long transactionTimeLimitMilliseconds; - private final boolean useSynchronizedSession; private final long leaseLengthMillis; public static final long UNLIMITED_TIME = 0; OnlineIndexOperationConfig(int maxLimit, int initialLimit, int maxRetries, int recordsPerSecond, long progressLogIntervalMillis, int increaseLimitAfter, int maxWriteLimitBytes, long timeLimitMilliseconds, long transactionTimeLimitMilliseconds, - boolean useSynchronizedSession, long leaseLengthMillis) { + long leaseLengthMillis) { this.maxLimit = maxLimit; this.initialLimit = initialLimit; this.maxRetries = maxRetries; @@ -94,7 +93,6 @@ public class OnlineIndexOperationConfig { this.maxWriteLimitBytes = maxWriteLimitBytes; this.timeLimitMilliseconds = timeLimitMilliseconds; this.transactionTimeLimitMilliseconds = transactionTimeLimitMilliseconds; - this.useSynchronizedSession = useSynchronizedSession; this.leaseLengthMillis = leaseLengthMillis; } @@ -188,8 +186,17 @@ public static Builder newBuilder() { return new Builder(); } + + /** + * Not used anymore. + * @return always false; + * @deprecated see {@link Builder#setUseSynchronizedSession(boolean)} + */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public boolean shouldUseSynchronizedSession() { - return useSynchronizedSession; + return false; } public long getLeaseLengthMillis() { @@ -213,7 +220,6 @@ public Builder toBuilder() { .setMaxRetries(this.maxRetries) .setTimeLimitMilliseconds(timeLimitMilliseconds) .setTransactionTimeLimitMilliseconds(this.transactionTimeLimitMilliseconds) - .setUseSynchronizedSession(useSynchronizedSession) .setLeaseLengthMillis(leaseLengthMillis); } @@ -234,7 +240,6 @@ public static class Builder { private long timeLimitMilliseconds = UNLIMITED_TIME; private long transactionTimeLimitMilliseconds = DEFAULT_TRANSACTION_TIME_LIMIT; private long leaseLengthMillis = DEFAULT_LEASE_LENGTH_MILLIS; - private boolean useSynchronizedSession = true; protected Builder() { @@ -492,16 +497,21 @@ public Builder setTransactionTimeLimitMilliseconds(long timeLimitMilliseconds) { * @see SynchronizedSessionRunner * @param useSynchronizedSession use synchronize session if true, otherwise false * @return this builder + * + * @deprecated Synchronized sessions are now determined by the indexing method. */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public Builder setUseSynchronizedSession(boolean useSynchronizedSession) { - this.useSynchronizedSession = useSynchronizedSession; + // no-op return this; } /** - * Set the lease length in milliseconds if the synchronized session is used. By default this is {@link #DEFAULT_LEASE_LENGTH_MILLIS}. - * @see #setUseSynchronizedSession(boolean) - * @see com.apple.foundationdb.synchronizedsession.SynchronizedSession + * If the indexing session is not expected to be mutual, abort indexing if another session is active. This function + * defines the maximum age of another session's heartbeat to be considered an "active session". + * The default value is {@link #DEFAULT_LEASE_LENGTH_MILLIS}. * @param leaseLengthMillis length between last access and lease's end time in milliseconds * @return this builder */ @@ -519,7 +529,7 @@ public Builder setLeaseLengthMillis(long leaseLengthMillis) { public OnlineIndexOperationConfig build() { return new OnlineIndexOperationConfig(maxLimit, initialLimit, maxRetries, recordsPerSecond, progressLogIntervalMillis, increaseLimitAfter, maxWriteLimitBytes, timeLimitMilliseconds, transactionTimeLimitMilliseconds, - useSynchronizedSession, leaseLengthMillis); + leaseLengthMillis); } } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexScrubber.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexScrubber.java index ced1afd22d..f1ed949da3 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexScrubber.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexScrubber.java @@ -75,7 +75,7 @@ private IndexingBase getScrubber(IndexScrubbingTools.ScrubbingType type, AtomicL @Nonnull private CompletableFuture scrubIndexAsync(IndexScrubbingTools.ScrubbingType type, AtomicLong count) { return AsyncUtil.composeHandle( - getScrubber(type, count).buildIndexAsync(false, common.config.shouldUseSynchronizedSession()), + getScrubber(type, count).buildIndexAsync(false), (ignore, ex) -> { if (ex != null) { throw FDBExceptions.wrapException(ex); diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java index 9976bd1930..c0c85ac68d 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -438,7 +439,11 @@ public void mergeIndex() { * the lock. * @return a future that will be ready when the lock is released * @see SynchronizedSession#endAnySession + * @deprecated synchronized build was replaced. The functionality of this function can be done with {@link #blockIndexBuilds} */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public CompletableFuture stopOngoingOnlineIndexBuildsAsync() { return runner.runAsync(context -> openRecordStore(context).thenAccept(recordStore -> stopOngoingOnlineIndexBuilds(recordStore, index)), @@ -447,7 +452,11 @@ public CompletableFuture stopOngoingOnlineIndexBuildsAsync() { /** * Synchronous/blocking version of {@link #stopOngoingOnlineIndexBuildsAsync()}. + * @deprecated synchronized build was replaced. The functionality of this function can be done with {@link #blockIndexBuilds} */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public void stopOngoingOnlineIndexBuilds() { runner.asyncToSync(FDBStoreTimer.Waits.WAIT_STOP_ONLINE_INDEX_BUILD, stopOngoingOnlineIndexBuildsAsync()); } @@ -457,7 +466,11 @@ public void stopOngoingOnlineIndexBuilds() { * the lock. * @param recordStore record store whose index builds need to be stopped * @param index the index whose builds need to be stopped + * @deprecated synchronized build was replaced. The functionality of this function can be done with {@link #blockIndexBuilds} */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public static void stopOngoingOnlineIndexBuilds(@Nonnull FDBRecordStore recordStore, @Nonnull Index index) { SynchronizedSession.endAnySession(recordStore.ensureContextActive(), IndexingSubspaces.indexBuildLockSubspace(recordStore, index)); } @@ -465,7 +478,11 @@ public static void stopOngoingOnlineIndexBuilds(@Nonnull FDBRecordStore recordSt /** * Synchronous/blocking version of {@link #checkAnyOngoingOnlineIndexBuildsAsync()}. * @return true if the index is being built and false otherwise + * @deprecated synchronized build was replaced. The functionality of this function can be done with {@link #getIndexingHeartbeats(int)} */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public boolean checkAnyOngoingOnlineIndexBuilds() { return runner.asyncToSync(FDBStoreTimer.Waits.WAIT_CHECK_ONGOING_ONLINE_INDEX_BUILD, checkAnyOngoingOnlineIndexBuildsAsync()); } @@ -474,7 +491,11 @@ public boolean checkAnyOngoingOnlineIndexBuilds() { * Check if the index is being built by any of the {@link OnlineIndexer}s (only if they use {@link SynchronizedSession}s), * including this {@link OnlineIndexer}. * @return a future that will complete to true if the index is being built and false otherwise + * @deprecated synchronized build was replaced. The functionality of this function can be done with {@link #getIndexingHeartbeats(int)} */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public CompletableFuture checkAnyOngoingOnlineIndexBuildsAsync() { return runner.runAsync(context -> openRecordStore(context).thenCompose(recordStore -> checkAnyOngoingOnlineIndexBuildsAsync(recordStore, index)), @@ -486,26 +507,18 @@ public CompletableFuture checkAnyOngoingOnlineIndexBuildsAsync() { * @param recordStore record store whose index builds need to be checked * @param index the index to check for ongoing index builds * @return a future that will complete to true if the index is being built and false otherwise + * @deprecated synchronized build was replaced. The functionality of this function can be done with {@link #getIndexingHeartbeats(int)} */ + @API(API.Status.DEPRECATED) + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP + @Deprecated(since = "4.4.3.0", forRemoval = true) public static CompletableFuture checkAnyOngoingOnlineIndexBuildsAsync(@Nonnull FDBRecordStore recordStore, @Nonnull Index index) { return SynchronizedSession.checkActiveSessionExists(recordStore.ensureContextActive(), IndexingSubspaces.indexBuildLockSubspace(recordStore, index)); } /** * Builds an index across multiple transactions. - *

- * If it is set to use synchronized sessions, it stops with {@link com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException} - * when there is another runner actively working on the same index. It first checks and updates index states and - * clear index data respecting the {@link IndexStatePrecondition} being set. It then builds the index across - * multiple transactions honoring the rate-limiting parameters set in the constructor of this class. It also retries - * any retriable errors that it encounters while it runs the build. At the end, it marks the index readable in the - * store. - *

- *

- * One may consider to set the index state precondition to {@link IndexStatePrecondition#ERROR_IF_DISABLED_CONTINUE_IF_WRITE_ONLY} - * and {@link OnlineIndexer.Builder#setUseSynchronizedSession(boolean)} to {@code false}, which makes the indexer - * follow the same behavior as before version 2.8.90.0. But it is not recommended. - *

+ * This is a slow and retrying operation that is intended to be executed by background processes. * @return a future that will be ready when the build has completed * @throws com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException the build is stopped * because there may be another build running actively on this index. @@ -518,8 +531,7 @@ public CompletableFuture buildIndexAsync() { @VisibleForTesting @Nonnull CompletableFuture buildIndexAsync(boolean markReadable) { - boolean useSyncLock = (!indexingPolicy.isMutual() || fallbackToRecordsScan) && common.config.shouldUseSynchronizedSession(); - return indexingLauncher(() -> getIndexer().buildIndexAsync(markReadable, useSyncLock)); + return indexingLauncher(() -> getIndexer().buildIndexAsync(markReadable)); } /** @@ -602,6 +614,29 @@ private Map indexingStamp(@Null getIndexer().performIndexingStampOperation(op, id, ttlSeconds)); } + /** + * Get the current indexing heartbeats for a given index (single target or primary index). + * @param maxCount safety valve to limit number items to read. Typically set to zero to keep unlimited. + * @return map of session ids to {@link IndexBuildProto.IndexBuildHeartbeat} + */ + @API(API.Status.EXPERIMENTAL) + public Map getIndexingHeartbeats(int maxCount) { + return asyncToSync(FDBStoreTimer.Waits.WAIT_INDEX_READ_HEARTBEATS, + getIndexer().getIndexingHeartbeats(maxCount)); + } + + /** + * Clear old indexing heartbeats for a given index (single target or primary index). + * @param minAgenMilliseconds minimum heartbeat age (in milliseconds) to clear. + * @param maxIteration safety valve to limit number of items to check. Typically set to zero to keep unlimited + * @return number of cleared heartbeats + */ + @API(API.Status.EXPERIMENTAL) + public int clearIndexingHeartbeats(long minAgenMilliseconds, int maxIteration) { + return asyncToSync(FDBStoreTimer.Waits.WAIT_INDEX_CLEAR_HEARTBEATS, + getIndexer().clearIndexingHeartbeats(minAgenMilliseconds, maxIteration)); + } + /** * Wait for an asynchronous task to complete. This returns the result from the future or propagates * the error if the future completes exceptionally. @@ -763,11 +798,6 @@ public Builder setRecordTypes(@Nullable Collection recordTypes) { * Set how should {@link #buildIndexAsync()} (or its variations) build the index based on its state. Normally * this should be {@link IndexStatePrecondition#BUILD_IF_DISABLED_CONTINUE_BUILD_IF_WRITE_ONLY} if the index is * not corrupted. - *

- * One may consider setting it to {@link IndexStatePrecondition#ERROR_IF_DISABLED_CONTINUE_IF_WRITE_ONLY} and - * {@link #setUseSynchronizedSession(boolean)} to {@code false}, which makes the indexer follow the same behavior - * as before version 2.8.90.0. But it is not recommended. - *

* @see IndexStatePrecondition * @param indexStatePrecondition build option to use * @return this builder @@ -1237,7 +1267,7 @@ public static class Builder { private DesiredAction ifReadable = DesiredAction.CONTINUE; private boolean doAllowUniquePendingState = false; private Set allowedTakeoverSet = null; - private long checkIndexingStampFrequency = 60_000; + private long checkIndexingStampFrequency = 10_000; private boolean useMutualIndexing = false; private List useMutualIndexingBoundaries = null; private boolean allowUnblock = false; @@ -1449,7 +1479,6 @@ public Builder setMutualIndexing() { * by other threads/processes/systems with the exact same parameters, are attempting to concurrently build this * index. To allow that, the indexer will: *
    - *
  1. Avoid the indexing lock - i.e. assume that {@link OnlineIndexer.Builder#setUseSynchronizedSession(boolean)} was called with false
  2. *
  3. Divide the records space to fragments, then iterate the fragments in a way that minimize the interference, while * indexing each fragment independently.
  4. *
  5. Handle indexing conflicts, when occurred.
  6. diff --git a/fdb-record-layer-core/src/main/proto/index_build.proto b/fdb-record-layer-core/src/main/proto/index_build.proto index eff6d2df1b..c7929645cc 100644 --- a/fdb-record-layer-core/src/main/proto/index_build.proto +++ b/fdb-record-layer-core/src/main/proto/index_build.proto @@ -46,4 +46,10 @@ message IndexBuildIndexingStamp { optional string blockID = 7; // optional, a short string that describes the reason for the block. } + message IndexBuildHeartbeat { + required string info = 1; + required int64 genesisTimeMilliseconds = 2; + required int64 heartbeatTimeMilliseconds = 3; + } + diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeatLowLevelTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeatLowLevelTest.java new file mode 100644 index 0000000000..8d68005ecc --- /dev/null +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeatLowLevelTest.java @@ -0,0 +1,372 @@ +/* + * testIndexingHeartbeaLowLevel.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb; + +import com.apple.foundationdb.record.IndexBuildProto; +import com.apple.foundationdb.record.RecordMetaData; +import com.apple.foundationdb.record.RecordMetaDataBuilder; +import com.apple.foundationdb.record.TestRecords1Proto; +import com.apple.foundationdb.record.metadata.Index; +import com.apple.foundationdb.record.metadata.IndexOptions; +import com.apple.foundationdb.record.metadata.IndexTypes; +import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression; +import com.apple.foundationdb.record.metadata.expressions.GroupingKeyExpression; +import com.apple.foundationdb.record.metadata.expressions.VersionKeyExpression; +import com.apple.foundationdb.record.provider.foundationdb.keyspace.KeySpacePath; +import com.apple.foundationdb.record.test.FDBDatabaseExtension; +import com.apple.foundationdb.record.test.TestKeySpace; +import com.apple.foundationdb.record.test.TestKeySpacePathManagerExtension; +import com.google.protobuf.Descriptors; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static com.apple.foundationdb.record.metadata.Key.Expressions.concat; +import static com.apple.foundationdb.record.metadata.Key.Expressions.field; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class IndexingHeartbeatLowLevelTest { + @RegisterExtension + final FDBDatabaseExtension dbExtension = new FDBDatabaseExtension(); + @RegisterExtension + final TestKeySpacePathManagerExtension pathManager = new TestKeySpacePathManagerExtension(dbExtension); + FDBDatabase fdb; + KeySpacePath path; + FDBRecordStore recordStore; + RecordMetaData metaData; + + @BeforeEach + public void setUp() { + final FDBDatabaseFactory factory = dbExtension.getDatabaseFactory(); + factory.setInitialDelayMillis(2L); + factory.setMaxDelayMillis(4L); + factory.setMaxAttempts(100); + + fdb = dbExtension.getDatabase(); + fdb.setAsyncToSyncTimeout(5, TimeUnit.MINUTES); + path = pathManager.createPath(TestKeySpace.RECORD_STORE); + } + + FDBRecordContext openContext() { + FDBRecordContext context = fdb.openContext(); + FDBRecordStore.Builder builder = createStoreBuilder() + .setContext(context); + recordStore = builder.createOrOpen(FDBRecordStoreBase.StoreExistenceCheck.NONE); + metaData = recordStore.getRecordMetaData(); + return context; + } + + @Nonnull + private FDBRecordStore.Builder createStoreBuilder() { + return FDBRecordStore.newBuilder() + .setMetaDataProvider(metaData) + .setKeySpacePath(path); + } + + void openMetaData(@Nonnull Descriptors.FileDescriptor descriptor, @Nonnull FDBRecordStoreTestBase.RecordMetaDataHook hook) { + RecordMetaDataBuilder metaDataBuilder = RecordMetaData.newBuilder().setRecords(descriptor); + hook.apply(metaDataBuilder); + metaData = metaDataBuilder.getRecordMetaData(); + } + + void openMetaData(@Nonnull Descriptors.FileDescriptor descriptor) { + openMetaData(descriptor, (metaDataBuilder) -> { + }); + } + + void openSimpleMetaData(@Nonnull FDBRecordStoreTestBase.RecordMetaDataHook hook) { + openMetaData(TestRecords1Proto.getDescriptor(), hook); + } + + protected static FDBRecordStoreTestBase.RecordMetaDataHook allIndexesHook(List indexes) { + return metaDataBuilder -> { + for (Index index: indexes) { + metaDataBuilder.addIndex("MySimpleRecord", index); + } + } ; + } + + void testHeartbeatLowLevel(List indexes) { + Assertions.assertThat(indexes).hasSizeGreaterThanOrEqualTo(2); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes); + + final int count = 10; + IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count]; + for (int i = 0; i < count; i++) { + heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), "test", 100 + i, false); + } + + // populate heartbeats + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + for (var heartbeat : heartbeats) { + heartbeat.updateHeartbeat(recordStore, indexes.get(0)); + heartbeat.updateHeartbeat(recordStore, indexes.get(1)); + } + context.commit(); + } + + // Verify query/clear operation + openSimpleMetaData(hook); + Index index = indexes.get(0); + try (FDBRecordContext context = openContext()) { + // Query, unlimited + Map queried = + IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(queried).hasSize(count); + Assertions.assertThat(queried.keySet()) + .containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(heartbeat -> heartbeat.indexerId).collect(Collectors.toList())); + + // Query, partial + queried = + IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 5).join(); + Assertions.assertThat(queried).hasSize(5); + + // clear, partial + int countDeleted = + IndexingHeartbeat.clearIndexingHeartbeats(recordStore, index, 0, 7).join(); + Assertions.assertThat(countDeleted).isEqualTo(7); + queried = + IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 5).join(); + Assertions.assertThat(queried).hasSize(3); + context.commit(); + } + + // Verify that the previous clear does not affect other index + openSimpleMetaData(hook); + index = indexes.get(1); + try (FDBRecordContext context = openContext()) { + Map queried = + IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 100).join(); + Assertions.assertThat(queried).hasSize(count); + Assertions.assertThat(queried.keySet()) + .containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(ht -> ht.indexerId).collect(Collectors.toList())); + + // clear all + int countDeleted = + IndexingHeartbeat.clearIndexingHeartbeats(recordStore, index, 0, 0).join(); + Assertions.assertThat(countDeleted).isEqualTo(count); + + // verify empty + queried = + IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(queried).isEmpty(); + context.commit(); + } + } + + @Test + void testHeartbeatLowLevelValueIndexes() { + List indexes = new ArrayList<>(); + indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + indexes.add(new Index("indexB", field("num_value_3_indexed"), IndexTypes.VALUE)); + testHeartbeatLowLevel(indexes); + } + + @Test + void testHeartbeatLowLevelSumCountIndexes() { + List indexes = new ArrayList<>(); + indexes.add(new Index("indexE", field("num_value_3_indexed").ungrouped(), IndexTypes.SUM)); + indexes.add(new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT)); + testHeartbeatLowLevel(indexes); + } + + @Test + void testHeartbeatLowLevelVersionIndexes() { + List indexes = new ArrayList<>(); + indexes.add(new Index("versionIndex1", concat(field("num_value_2"), VersionKeyExpression.VERSION), IndexTypes.VERSION)); + indexes.add(new Index("versionIndex2", concat(field("num_value_2"), VersionKeyExpression.VERSION), IndexTypes.VERSION)); + testHeartbeatLowLevel(indexes); + } + + @Test + void testCheckAndUpdateByRecords() { + Index index = new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index)); + IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), "Test", TimeUnit.SECONDS.toMillis(30), false); + + // Successfully update heartbeat + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + heartbeat1.checkAndUpdateHeartbeat(recordStore, index).join(); + context.commit(); + } + + // Successfully update heartbeat + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + final Map existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(existingHeartbeats).hasSize(1); + heartbeat1.checkAndUpdateHeartbeat(recordStore, index).join(); + context.commit(); + } + + IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), "Test", TimeUnit.SECONDS.toMillis(30), false); + Assertions.assertThat(heartbeat1.indexerId).isNotEqualTo(heartbeat2.indexerId); + // Fail to create another 'BY_RECORD` heartbeat + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + final Map existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(existingHeartbeats).hasSize(1); + final CompletionException ex = assertThrows(CompletionException.class, () -> heartbeat2.checkAndUpdateHeartbeat(recordStore, index).join()); + Assertions.assertThat(ex.getMessage()).contains("SynchronizedSessionLockedException"); + context.commit(); + } + + // Successfully clear heartbeat1 + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + final Map existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(existingHeartbeats).hasSize(1); + heartbeat1.clearHeartbeat(recordStore, index); + context.commit(); + } + + // Successfully update heartbeat2 + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + heartbeat2.checkAndUpdateHeartbeat(recordStore, index).join(); + context.commit(); + } + + // Successfully clear heartbeat2 + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + heartbeat2.clearHeartbeat(recordStore, index); + final Map existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(existingHeartbeats).isEmpty(); + context.commit(); + } + } + + @Test + void testCheckAndUpdateMutual() { + Index index = new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index)); + + final int count = 10; + IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count]; + for (int i = 0; i < count; i++) { + heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), "Mutual", TimeUnit.SECONDS.toMillis(100), true); + } + + // Successfully check//update all heartbeats + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + for (IndexingHeartbeat heartbeat: heartbeats) { + heartbeat.checkAndUpdateHeartbeat(recordStore, index).join(); + } + context.commit(); + } + + // Check count, clear all + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + final Map existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(existingHeartbeats).hasSize(count); + + for (IndexingHeartbeat heartbeat: heartbeats) { + heartbeat.clearHeartbeat(recordStore, index); + } + context.commit(); + } + + // verify cleared + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + final Map existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(existingHeartbeats).isEmpty(); + context.commit(); + } + } + + @Test + void testExpiredHeartbeat() throws InterruptedException { + Index index = new Index("versionIndex1", concat(field("num_value_2"), VersionKeyExpression.VERSION), IndexTypes.VERSION); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index)); + IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), "Test", TimeUnit.SECONDS.toMillis(10), false); + + // Successfully update heartbeat1 + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + heartbeat1.checkAndUpdateHeartbeat(recordStore, index).join(); + context.commit(); + } + + // Delay 20, set heartbeat2's lease to 4 + Thread.sleep(20); + IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), "Test", 4, false); + Assertions.assertThat(heartbeat1.indexerId).isNotEqualTo(heartbeat2.indexerId); + + // heartbeat2 successfully takes over + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + final Map existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join(); + Assertions.assertThat(existingHeartbeats).hasSize(1); + heartbeat2.checkAndUpdateHeartbeat(recordStore, index).join(); + context.commit(); + } + } + + @Test + void testHeartbeatExpiration() throws InterruptedException { + Index index = new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index)); + + final IndexingHeartbeat heartbeatA = new IndexingHeartbeat(UUID.randomUUID(), "a", 500, false); + final IndexingHeartbeat heartbeatB = new IndexingHeartbeat(UUID.randomUUID(), "b", 5, false); + + // Set heartbeat A + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + heartbeatA.checkAndUpdateHeartbeat(recordStore, index).join(); + context.commit(); + } + + Thread.sleep(100); + // Expect heartbeatA to expire after 5 milliseconds, and successfully set heartbeatB + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + heartbeatB.checkAndUpdateHeartbeat(recordStore, index).join(); + context.commit(); + } + + // Expect heartbeatA to fail check/update + // Note: if become flakey, increase the least time of heartbeatA + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + final CompletionException ex = assertThrows(CompletionException.class, () -> heartbeatA.checkAndUpdateHeartbeat(recordStore, index).join()); + Assertions.assertThat(ex.getMessage()).contains("SynchronizedSessionLockedException"); + context.commit(); + } + } +} diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerBuildIndexTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerBuildIndexTest.java index 1bc4687041..ff948415dd 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerBuildIndexTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerBuildIndexTest.java @@ -21,8 +21,8 @@ package com.apple.foundationdb.record.provider.foundationdb; import com.apple.foundationdb.async.AsyncUtil; -import com.apple.foundationdb.async.MoreAsyncUtil; import com.apple.foundationdb.async.RangeSet; +import com.apple.foundationdb.record.IndexBuildProto; import com.apple.foundationdb.record.IndexState; import com.apple.foundationdb.record.logging.KeyValueLogMessage; import com.apple.foundationdb.record.logging.LogMessageKeys; @@ -34,7 +34,6 @@ import com.apple.foundationdb.tuple.Tuple; import com.apple.test.RandomizedTestUtils; import com.google.protobuf.Message; -import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ThreadLocalRandom; @@ -180,7 +180,6 @@ void singleRebuild( if (!safeBuild) { indexingPolicy.setIfDisabled(OnlineIndexer.IndexingPolicy.DesiredAction.ERROR) .setIfMismatchPrevious(OnlineIndexer.IndexingPolicy.DesiredAction.ERROR); - builder.setUseSynchronizedSession(false); } if (sourceIndex != null) { indexingPolicy.setSourceIndex(sourceIndex.getName()) @@ -238,12 +237,6 @@ void singleRebuild( }); } } - if (safeBuild) { - buildFuture = MoreAsyncUtil.composeWhenComplete( - buildFuture, - (result, ex) -> indexBuilder.checkAnyOngoingOnlineIndexBuildsAsync().thenAccept(Assertions::assertFalse), - fdb::mapAsyncToSyncException); - } if (recordsWhileBuilding != null && !recordsWhileBuilding.isEmpty()) { int i = 0; @@ -313,6 +306,10 @@ void singleRebuild( )); } } + try (OnlineIndexer indexBuilder = newIndexerBuilder(index).build()) { + final Map heartbeats = indexBuilder.getIndexingHeartbeats(0); + assertTrue(heartbeats.isEmpty()); + } KeyValueLogMessage msg = KeyValueLogMessage.build("building index - completed", TestLogMessageKeys.INDEX, index); msg.addKeysAndValues(timer.getKeysAndValues()); LOGGER.info(msg.toString()); diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerIndexFromIndexTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerIndexFromIndexTest.java index 49bbfe9b3a..019b7f9ca3 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerIndexFromIndexTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerIndexFromIndexTest.java @@ -23,14 +23,13 @@ import com.apple.foundationdb.record.IndexBuildProto; import com.apple.foundationdb.record.RecordCoreException; import com.apple.foundationdb.record.TestRecords1Proto; -import com.apple.foundationdb.record.logging.KeyValueLogMessage; -import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.metadata.Index; import com.apple.foundationdb.record.metadata.IndexOptions; import com.apple.foundationdb.record.metadata.IndexTypes; import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression; import com.apple.foundationdb.record.metadata.expressions.GroupingKeyExpression; import com.apple.foundationdb.record.metadata.expressions.KeyExpression; +import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException; import com.apple.test.BooleanSource; import com.google.common.collect.Comparators; import org.junit.jupiter.api.Test; @@ -42,6 +41,9 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1160,43 +1162,67 @@ void testIndexFromIndexBlock() { } @Test - void testIndexFromIndexIgnoreSyncLock() { + void testForbidConcurrentIndexFromIndexSessions() throws InterruptedException { + // Do not let a conversion of few indexes of an active multi-target session + final int numRecords = 59; + populateData(numRecords); - final long numRecords = 180; + Index sourceIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS); + openSimpleMetaData(metaDataBuilder -> metaDataBuilder.addIndex("MySimpleRecord", sourceIndex)); + buildIndexClean(sourceIndex); - Index srcIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS); + // Partly build index Index tgtIndex = new Index("tgt_index", field("num_value_3_indexed"), IndexTypes.VALUE); - FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(srcIndex, tgtIndex); - - populateData(numRecords); - + FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(sourceIndex, tgtIndex); openSimpleMetaData(hook); - buildIndexClean(srcIndex); - disableAll(List.of(tgtIndex)); - - openSimpleMetaData(hook); - - IntStream.rangeClosed(0, 4).parallel().forEach(id -> { - snooze(100 - id); - try { - try (OnlineIndexer indexBuilder = newIndexerBuilder(tgtIndex) - .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() - .setSourceIndex("src_index") - .forbidRecordScan()) - .setLimit(5) - .setUseSynchronizedSession(id == 0) - .setMaxRetries(100) // enough to avoid giving up - .build()) { - indexBuilder.buildIndex(true); - } - } catch (IndexingBase.UnexpectedReadableException ex) { - LOGGER.info(KeyValueLogMessage.of("Ignoring lock, got exception", - LogMessageKeys.SESSION_ID, id, - LogMessageKeys.ERROR, ex.getMessage())); + + Semaphore pauseMutualBuildSemaphore = new Semaphore(1); + Semaphore startBuildingSemaphore = new Semaphore(1); + pauseMutualBuildSemaphore.acquire(); + startBuildingSemaphore.acquire(); + AtomicBoolean passed = new AtomicBoolean(false); + Thread t1 = new Thread(() -> { + // build index and pause halfway, allowing an active session test + try (OnlineIndexer indexBuilder = newIndexerBuilder(tgtIndex) + .setLeaseLengthMillis(TimeUnit.SECONDS.toMillis(20)) + .setLimit(4) + .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() + .setSourceIndex("src_index") + .forbidRecordScan()) + .setConfigLoader(old -> { + if (passed.get()) { + try { + startBuildingSemaphore.release(); + pauseMutualBuildSemaphore.acquire(); // pause to try building indexes + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + pauseMutualBuildSemaphore.release(); + } + } else { + passed.set(true); + } + return old; + }) + .build()) { + indexBuilder.buildIndex(); } }); - - assertReadable(List.of(tgtIndex)); - scrubAndValidate(List.of(tgtIndex)); + t1.start(); + startBuildingSemaphore.acquire(); + startBuildingSemaphore.release(); + // Try one index at a time + try (OnlineIndexer indexBuilder = newIndexerBuilder(tgtIndex) + .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() + .setSourceIndex("src_index") + .forbidRecordScan()) + .build()) { + assertThrows(SynchronizedSessionLockedException.class, indexBuilder::buildIndex); + } + // let the other thread finish indexing + pauseMutualBuildSemaphore.release(); + t1.join(); + // happy indexes assertion + assertReadable(tgtIndex); } } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMultiTargetTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMultiTargetTest.java index e8892421c8..40b23d7383 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMultiTargetTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMultiTargetTest.java @@ -23,8 +23,6 @@ import com.apple.foundationdb.record.IndexBuildProto; import com.apple.foundationdb.record.RecordCoreException; import com.apple.foundationdb.record.TestRecords1Proto; -import com.apple.foundationdb.record.logging.KeyValueLogMessage; -import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.metadata.Index; import com.apple.foundationdb.record.metadata.IndexOptions; import com.apple.foundationdb.record.metadata.IndexTypes; @@ -37,8 +35,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.util.ArrayList; @@ -50,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.LongStream; import static com.apple.foundationdb.record.metadata.Key.Expressions.field; @@ -63,7 +58,6 @@ */ @Tag(Tags.Slow) class OnlineIndexerMultiTargetTest extends OnlineIndexerTest { - private static final Logger LOGGER = LoggerFactory.getLogger(OnlineIndexerMultiTargetTest.class); private void populateOtherData(final long numRecords) { List records = LongStream.range(0, numRecords).mapToObj(val -> @@ -970,47 +964,4 @@ void testForbidConversionOfActiveMultiTargetToMutual() throws InterruptedExcepti // happy indexes assertion assertReadable(indexes); } - - @ParameterizedTest - @BooleanSource - void testMultiTargetIgnoringSyncLock(boolean reverseScan) { - // Simply build the index - - final long numRecords = 180; - - List indexes = new ArrayList<>(); - indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); - indexes.add(new Index("indexB", field("num_value_3_indexed"), IndexTypes.VALUE)); - indexes.add(new Index("indexC", field("num_value_unique"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); - indexes.add(new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT)); - - populateData(numRecords); - - FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes); - openSimpleMetaData(hook); - disableAll(indexes); - - IntStream.rangeClosed(0, 4).parallel().forEach(id -> { - snooze(100 - id); - try { - try (OnlineIndexer indexBuilder = newIndexerBuilder(indexes) - .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() - .setReverseScanOrder(reverseScan)) - .setLimit(5) - .setUseSynchronizedSession(id == 0) - .setMaxRetries(100) // enough to avoid giving up - .build()) { - indexBuilder.buildIndex(true); - } - } catch (IndexingBase.UnexpectedReadableException ex) { - LOGGER.info(KeyValueLogMessage.of("Ignoring lock, got exception", - LogMessageKeys.SESSION_ID, id, - LogMessageKeys.ERROR, ex.getMessage())); - } - }); - - assertReadable(indexes); - scrubAndValidate(indexes); - } - } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerSimpleTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerSimpleTest.java index 89ae6a7f58..58a92c58d5 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerSimpleTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerSimpleTest.java @@ -406,7 +406,6 @@ public void testConfigLoader() throws Exception { // once" may not hold true. With the two options, it is running doBuildIndexAsync effectively. Probably // it would be better if this test can test the config loader with bare metal OnlineIndexer.runAsync // instead. - .setUseSynchronizedSession(false) .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() .setIfReadable(OnlineIndexer.IndexingPolicy.DesiredAction.ERROR) .setIfWriteOnly(OnlineIndexer.IndexingPolicy.DesiredAction.CONTINUE) @@ -888,4 +887,32 @@ public void runWithWeakReadSemantics() { fdb.setTrackLastSeenVersionOnRead(dbTracksReadVersionOnCommit); } } + + @Test + @SuppressWarnings("removal") + void testDeprecatedSetUseSynchronizedSession() { + List records = LongStream.range(0, 20).mapToObj(val -> + TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(val).setNumValue2((int)val + 1).build() + ).collect(Collectors.toList()); + Index index = new Index("simple$value_2", field("num_value_2").ungrouped(), IndexTypes.SUM); + FDBRecordStoreTestBase.RecordMetaDataHook hook = metaDataBuilder -> metaDataBuilder.addIndex("MySimpleRecord", index); + + openSimpleMetaData(); + try (FDBRecordContext context = openContext()) { + records.forEach(recordStore::saveRecord); + context.commit(); + } + + openSimpleMetaData(hook); + disableAll(List.of(index)); + + openSimpleMetaData(hook); + try (OnlineIndexer indexBuilder = newIndexerBuilder(index) + .setUseSynchronizedSession(true) + .build()) { + indexBuilder.buildIndex(); + } + + assertReadable(index); + } } diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexingHeartbeatTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexingHeartbeatTest.java new file mode 100644 index 0000000000..c110536316 --- /dev/null +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexingHeartbeatTest.java @@ -0,0 +1,356 @@ +/* + * OnlineIndexingHeartbeatTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb; + +import com.apple.foundationdb.record.IndexBuildProto; +import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.metadata.Index; +import com.apple.foundationdb.record.metadata.IndexOptions; +import com.apple.foundationdb.record.metadata.IndexTypes; +import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression; +import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.apple.foundationdb.record.metadata.Key.Expressions.field; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verify indexing heartbeat activity (query & clear). + */ +class OnlineIndexingHeartbeatTest extends OnlineIndexerTest { + + @Test + void testHeartbeatLowLevel() { + List indexes = new ArrayList<>(); + indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + indexes.add(new Index("indexB", field("num_value_3_indexed"), IndexTypes.VALUE)); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes); + + final int count = 10; + IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count]; + for (int i = 0; i < count; i++) { + heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), "Test", 100 + i, true); + } + + openSimpleMetaData(hook); + try (FDBRecordContext context = openContext()) { + for (var heartbeat : heartbeats) { + heartbeat.updateHeartbeat(recordStore, indexes.get(0)); + heartbeat.updateHeartbeat(recordStore, indexes.get(1)); + } + context.commit(); + } + + // Verify query/clear operation + try (OnlineIndexer indexer = newIndexerBuilder(indexes.get(0)).build()) { + // Query, unlimited + Map queried = indexer.getIndexingHeartbeats(0); + Assertions.assertThat(queried).hasSize(count); + Assertions.assertThat(queried.keySet()) + .containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(heartbeat -> heartbeat.indexerId).collect(Collectors.toList())); + + // Query, partial + queried = indexer.getIndexingHeartbeats(5); + Assertions.assertThat(queried).hasSize(5); + + // clear, partial + int countDeleted = indexer.clearIndexingHeartbeats(0, 7); + Assertions.assertThat(countDeleted).isEqualTo(7); + queried = indexer.getIndexingHeartbeats(5); + Assertions.assertThat(queried).hasSize(3); + } + + // Verify that the previous clear does not affect other index + try (OnlineIndexer indexer = newIndexerBuilder(indexes.get(1)).build()) { + Map queried = indexer.getIndexingHeartbeats(100); + Assertions.assertThat(queried).hasSize(count); + Assertions.assertThat(queried.keySet()) + .containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(ht -> ht.indexerId).collect(Collectors.toList())); + + // clear all + int countDeleted = indexer.clearIndexingHeartbeats(0, 0); + Assertions.assertThat(countDeleted).isEqualTo(count); + + // verify empty + queried = indexer.getIndexingHeartbeats(0); + Assertions.assertThat(queried).isEmpty(); + } + } + + @ParameterizedTest + @BooleanSource + void testIndexersHeartbeatsClearAfterBuild(boolean mutualIndexing) { + // Assert that the heartbeats are cleared after building + List indexes = new ArrayList<>(); + indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + indexes.add(new Index("indexC", field("num_value_unique"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + int numRecords = 77; + populateData(numRecords); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes); + openSimpleMetaData(hook); + disableAll(indexes); + + if (mutualIndexing) { + int boundarySize = 23; + final List boundariesList = getBoundariesList(numRecords, boundarySize); + IntStream.rangeClosed(1, 5).parallel().forEach(i -> { + try (OnlineIndexer indexer = newIndexerBuilder(indexes) + .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() + .setMutualIndexingBoundaries(boundariesList)) + .build()) { + indexer.buildIndex(); + } + }); + } else { + try (OnlineIndexer indexer = newIndexerBuilder(indexes) + .build()) { + indexer.buildIndex(); + } + } + + for (Index index : indexes) { + try (OnlineIndexer indexer = newIndexerBuilder(index).build()) { + Assertions.assertThat(indexer.getIndexingHeartbeats(0)).isEmpty(); + } + } + } + + @ParameterizedTest + @BooleanSource + void testIndexersHeartbeatsClearAfterCrash(boolean mutualIndexing) { + // Assert that the heartbeats are cleared after crash + List indexes = new ArrayList<>(); + indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + indexes.add(new Index("indexC", field("num_value_unique"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + int numRecords = 98; + populateData(numRecords); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes); + openSimpleMetaData(hook); + disableAll(indexes); + + final String testThrowMsg = "Intentionally crash during test"; + final AtomicLong counter = new AtomicLong(0); + if (mutualIndexing) { + int boundarySize = 20; + final List boundariesList = getBoundariesList(numRecords, boundarySize); + IntStream.rangeClosed(1, 9).parallel().forEach(i -> { + try (OnlineIndexer indexer = newIndexerBuilder(indexes) + .setLimit(10) + .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() + .setMutualIndexingBoundaries(boundariesList)) + .setConfigLoader(old -> { + // Unfortunately, we cannot verify that at least one heartbeat exists from this + // block, as it would have been nesting "asyncToSync" functions. But there are other tests + // that verify the "sync lock" functionality. + if (counter.incrementAndGet() > 2) { + throw new RecordCoreException(testThrowMsg); + } + return old; + }) + .build()) { + RecordCoreException e = assertThrows(RecordCoreException.class, indexer::buildIndex); + assertTrue(e.getMessage().contains(testThrowMsg)); + } + }); + } else { + try (OnlineIndexer indexer = newIndexerBuilder(indexes) + .setLimit(10) + .setConfigLoader(old -> { + // Unfortunately, we cannot verify that at least one heartbeat exists from this + // block, as it would have been nesting "asyncToSync" functions. But there are other tests + // that verify the "sync lock" functionality. + if (counter.incrementAndGet() > 2) { + throw new RecordCoreException(testThrowMsg); + } + return old; + }) + .build()) { + RecordCoreException e = assertThrows(RecordCoreException.class, indexer::buildIndex); + assertTrue(e.getMessage().contains(testThrowMsg)); + } + } + + for (Index index : indexes) { + try (OnlineIndexer indexer = newIndexerBuilder(index).build()) { + Assertions.assertThat(indexer.getIndexingHeartbeats(0)).isEmpty(); + } + } + } + + @Test + void testMutualIndexersHeartbeatsClearAfterBuild() throws InterruptedException { + // Check heartbeats count during mutual indexing + List indexes = new ArrayList<>(); + indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + indexes.add(new Index("indexC", field("num_value_unique"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + int numRecords = 77; + populateData(numRecords); + int boundarySize = 5; + final List boundariesList = getBoundariesList(numRecords, boundarySize); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes); + openSimpleMetaData(hook); + disableAll(indexes); + + Semaphore pauseSemaphore = new Semaphore(1); + Semaphore startSemaphore = new Semaphore(1); + final AtomicInteger count = new AtomicInteger(0); + pauseSemaphore.acquire(); + startSemaphore.acquire(); + AtomicReference> heartbeats = new AtomicReference<>(); + IntStream.rangeClosed(1, 4).parallel().forEach(i -> { + if (i == 4) { + try { + startSemaphore.acquire(); + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (OnlineIndexer indexer = newIndexerBuilder(indexes).build()) { + heartbeats.set(indexer.getIndexingHeartbeats(0)); + } + startSemaphore.release(); + pauseSemaphore.release(); + } else { + AtomicInteger counter = new AtomicInteger(0); + try (OnlineIndexer indexer = newIndexerBuilder(indexes) + .setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder() + .setMutualIndexingBoundaries(boundariesList)) + .setConfigLoader(old -> { + if (counter.incrementAndGet() > 0) { + if (count.incrementAndGet() == 2) { + startSemaphore.release(); + } + try { + pauseSemaphore.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + pauseSemaphore.release(); + } + } + return old; + }) + .build()) { + indexer.buildIndex(); + } + } + }); + // While building, heartbeats count should have been 3 + Assertions.assertThat(heartbeats.get()).hasSize(3); + + // After building, heartbeats count should be 0 + try (OnlineIndexer indexer = newIndexerBuilder(indexes).build()) { + heartbeats.set(indexer.getIndexingHeartbeats(0)); + } + } + + + @Test + void testHeartbeatsRenewal() throws InterruptedException { + // make sure that the heartbeats behave as expected during indexing: + // single item + // same indexerId, genesis time + // monotonically increasing heartbeats + List indexes = new ArrayList<>(); + indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS)); + int numRecords = 74; + populateData(numRecords); + FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes); + openSimpleMetaData(hook); + disableAll(indexes); + final List> heartbeatsQueries = new ArrayList<>(); + + Semaphore indexerGo = new Semaphore(1); + Semaphore colectorGo = new Semaphore(1); + AtomicBoolean indexerDone = new AtomicBoolean(false); + colectorGo.acquire(); + Thread indexerThread = new Thread( () -> { + try (OnlineIndexer indexer = newIndexerBuilder(indexes) + .setLimit(10) + .setConfigLoader(old -> { + colectorGo.release(); + try { + indexerGo.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return old; + }) + .build()) { + indexer.buildIndex(); + } + colectorGo.release(); + indexerDone.set(true); + }); + + Thread collectorThread = new Thread(() -> { + while (!indexerDone.get()) { + try { + colectorGo.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try (FDBRecordContext context = openContext()) { + final Map heartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, indexes.get(0), 0).join(); + heartbeatsQueries.add(heartbeats); + context.commit(); + } + indexerGo.release(); + } + }); + indexerThread.start(); + collectorThread.start(); + collectorThread.join(); + indexerThread.join(); + + Assertions.assertThat(heartbeatsQueries).hasSizeGreaterThan(5); + Assertions.assertThat(heartbeatsQueries.get(0)).hasSize(1); + final Map.Entry first = heartbeatsQueries.get(0).entrySet().iterator().next(); + Map.Entry previous = first; + for (int i = 1; i < heartbeatsQueries.size() - 1; i++) { + Assertions.assertThat(heartbeatsQueries.get(i)).hasSize(1); + final Map.Entry item = heartbeatsQueries.get(i).entrySet().iterator().next(); + Assertions.assertThat(item.getKey()).isEqualTo(first.getKey()); + Assertions.assertThat(item.getValue().getGenesisTimeMilliseconds()).isEqualTo(first.getValue().getGenesisTimeMilliseconds()); + Assertions.assertThat(item.getValue().getInfo()).isEqualTo(first.getValue().getInfo()); + Assertions.assertThat(item.getValue().getHeartbeatTimeMilliseconds()) + .isGreaterThan(previous.getValue().getHeartbeatTimeMilliseconds()); + previous = item; + } + } +}