Skip to content

Online Indexer: replace the synchronized runner with a heartbeat #3530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public enum LogMessageKeys {
TRANSACTION_ID,
TRANSACTION_NAME,
AGE_SECONDS,
AGE_MILLISECONDS,
CONSTITUENT,
TOTAL_MICROS,
// record splitting/unsplitting
Expand Down Expand Up @@ -162,7 +163,7 @@ public enum LogMessageKeys {
RECORDS_PER_SECOND,
DOCUMENT,
SESSION_ID,
INDEXER_SESSION_ID,
EXISTING_INDEXER_ID,
INDEXER_ID,
INDEX_STATE_PRECONDITION,
INITIAL_INDEX_STATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4966,6 +4966,7 @@ public void removeFormerIndex(FormerIndex formerIndex) {

private void clearReadableIndexBuildData(Index index) {
IndexingRangeSet.forIndexBuild(this, index).clear();
IndexingHeartbeat.clearAllHeartbeats(this, index);
}

@SuppressWarnings("PMD.CloseResource")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ public enum Waits implements Wait {
WAIT_INDEX_OPERATION("wait for index operation"),
/** Wait for indexing type stamp operation. */
WAIT_INDEX_TYPESTAMP_OPERATION("wait for indexing type stamp operation"),
/** Wait for clearing indexing heartbeats. */
WAIT_INDEX_CLEAR_HEARTBEATS("Wait for clearing indexing heartbeats"),
/** Wait for reading indexing heartbeats. */
WAIT_INDEX_READ_HEARTBEATS("Wait for reading indexing heartbeats"),
/** Wait for adding an index. */
WAIT_ADD_INDEX("wait for adding an index"),
/** Wait for dropping an index. */
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private Index getSourceIndex(RecordMetaData metaData) {
throw new ValidationException("no source index",
LogMessageKeys.INDEX_NAME, common.getIndex().getName(),
LogMessageKeys.SOURCE_INDEX, policy.getSourceIndex(),
LogMessageKeys.INDEXER_ID, common.getUuid());
LogMessageKeys.INDEXER_ID, common.getIndexerId());
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.metadata.MetaDataException;
import com.apple.foundationdb.record.metadata.RecordType;
import com.apple.foundationdb.record.provider.foundationdb.synchronizedsession.SynchronizedSessionRunner;
import com.apple.foundationdb.record.query.plan.synthetic.SyntheticRecordPlanner;
import com.apple.foundationdb.tuple.Tuple;

Expand All @@ -50,10 +49,9 @@

@API(API.Status.INTERNAL)
public class IndexingCommon {
private final UUID uuid = UUID.randomUUID();
private final UUID indexerId = UUID.randomUUID();

@Nonnull private final FDBDatabaseRunner runner;
@Nullable private SynchronizedSessionRunner synchronizedSessionRunner = null;

@Nonnull private final FDBRecordStore.Builder recordStoreBuilder;
@Nonnull private final AtomicLong totalRecordsScanned;
Expand Down Expand Up @@ -137,8 +135,8 @@ private void fillTargetIndexers(@Nonnull List<Index> targetIndexes, @Nullable Co
}
}

public UUID getUuid() {
return uuid;
public UUID getIndexerId() {
return indexerId;
}

public List<Object> indexLogMessageKeyValues() {
Expand All @@ -158,7 +156,7 @@ public List<Object> indexLogMessageKeyValues(@Nullable String transactionName, @
logIf(true, keyValues,
LogMessageKeys.TARGET_INDEX_NAME, getTargetIndexesNames(),
LogMessageKeys.RECORDS_SCANNED, totalRecordsScanned.get(),
LogMessageKeys.INDEXER_ID, uuid);
LogMessageKeys.INDEXER_ID, indexerId);

if (moreKeyValues != null && !moreKeyValues.isEmpty()) {
keyValues.addAll(moreKeyValues);
Expand All @@ -176,11 +174,6 @@ private void logIf(boolean condition, List<Object> list, @Nonnull Object... a) {

@Nonnull
public FDBDatabaseRunner getRunner() {
return synchronizedSessionRunner == null ? runner : synchronizedSessionRunner;
}

@Nonnull
public FDBDatabaseRunner getNonSynchronizedRunner() {
return runner;
}

Expand Down Expand Up @@ -258,15 +251,6 @@ public FDBRecordStore.Builder getRecordStoreBuilder() {
return recordStoreBuilder;
}

@Nullable
public SynchronizedSessionRunner getSynchronizedSessionRunner() {
return synchronizedSessionRunner;
}

public void setSynchronizedSessionRunner(@Nullable final SynchronizedSessionRunner synchronizedSessionRunner) {
this.synchronizedSessionRunner = synchronizedSessionRunner;
}

@Nonnull
public AtomicLong getTotalRecordsScanned() {
return totalRecordsScanned;
Expand All @@ -287,8 +271,5 @@ public boolean loadConfig() {

public void close() {
runner.close();
if (synchronizedSessionRunner != null) {
synchronizedSessionRunner.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* IndexingHeartbeat.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.async.AsyncIterator;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.record.IndexBuildProto;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException;
import com.google.protobuf.InvalidProtocolBufferException;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

public class IndexingHeartbeat {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a bit of the code could be more generic. Particularly, I see that the indexingMethod in the heartbeat is not read by anything, indicating that is more for logging purposes, and could be replaced by a string.
If you pull out the logic for what the limit should be you could have this completely independent of indexing, and provide a raw subspace to store the information. Doing so would make some testing scenarios easier, and allow for re-use in different scenarios.
It would also allow for testing some of this logic with ThrottledRetryingIterator, and eventually could allow for a LimitedConcurrencyThrottledIterator which would be analogous to SynchronizedSession. That wouldn't be immediately relevant, but would allow for more controlled testing of some of the behaviors as you would be able to have better control over when individual futures complete, and run more interesting code in the transactions (such as fetching the list of existing heartbeats yourself).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going in a completely different direction, would it make sense to combine the heartbeats more with the type stamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indexingMethod is for:
a) Debugging (if we'll ever find a leftover heartbeat).
b) Providing extra information when heartbeats are queried. Hopefully that will enable better decision making in any automated system that monitors indexing processes (decisions such as: adding a mutual indexing thread, converting to mutual, stopping a slow by-source-index session and restart indexing, etc.)

I am not sure that I understand your LimitedConcurrencyThrottledIterator idea.

The type stamp is complementary to the heartbeat. Initially I thought about making a copy of the type stamp to each heartbeat, but it seems easier - for integrity reasons - to keep the type stamp as is.

// [prefix, indexerId] -> [indexing-type, genesis time, heartbeat time]
final UUID indexerId;
final String info;
final long genesisTimeMilliseconds;
final long leaseLength;
final boolean allowMutual;

public IndexingHeartbeat(final UUID indexerId, String info, long leaseLength, boolean allowMutual) {
this.indexerId = indexerId;
this.info = info;
this.leaseLength = leaseLength;
this.allowMutual = allowMutual;
this.genesisTimeMilliseconds = nowMilliseconds();
}

public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
byte[] key = IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack();
byte[] value = IndexBuildProto.IndexBuildHeartbeat.newBuilder()
.setInfo(info)
.setGenesisTimeMilliseconds(genesisTimeMilliseconds)
.setHeartbeatTimeMilliseconds(nowMilliseconds())
.build().toByteArray();
store.ensureContextActive().set(key, value);
}

public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
// complete exceptionally if non-mutual, other exists
if (allowMutual) {
updateHeartbeat(store, index);
return AsyncUtil.DONE;
}

final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
final long now = nowMilliseconds();
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
.thenApply(hasNext -> {
if (!hasNext) {
return false;
}
final KeyValue kv = iterator.next();
try {
final UUID otherIndexerId = heartbeatKeyToIndexerId(store, index, kv.getKey());
if (!otherIndexerId.equals(this.indexerId)) {
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
if (age > 0 && age < leaseLength) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this clear the heartbeat if it is expired?
Are you avoiding that to avoid conflicts by a bunch of workers discovering that one has expired?
Does the indexing process guarantee that they are cleared when it fails?

If the instance crashes does the heartbeat stick around until you complete an index build (successfully, or with an exception)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I believe that every process clears its own heartbeat. Instance crashes should be too rare to cause significant DB space leak (or so I hope). If needed, a cleanup can be triggered by the user with the Indexer's new clearIndexingHeartbeats API.

Saying that, we can clear all heartbeats in FDBRecordStore.clearReadableIndexBuildData, which is triggered when the index becomes readable. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearing all heartbeats with an index becomes readable.

// For practical reasons, this exception is backward compatible to the Synchronized Lock one
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably reasonable to keep this exception as is, for backwards-compatibility, but worth a comment.
Or perhaps worth throwing an exception extending from SynchronizedSessionLockedException (and having a comment there about why)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had added a comment. I know about some users that have special handling on this exception, it might be worth keeping it backward compatible.

.addLogInfo(LogMessageKeys.INDEXER_ID, indexerId)
.addLogInfo(LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId)
.addLogInfo(LogMessageKeys.AGE_MILLISECONDS, age)
.addLogInfo(LogMessageKeys.TIME_LIMIT_MILLIS, leaseLength);
}
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return true;
}))
.thenApply(ignore -> {
updateHeartbeat(store, index);
return null;
});
}

public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack());
}

public static void clearAllHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index) {
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index).range());
}

public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) {
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> ret = new HashMap<>();
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
final AtomicInteger iterationCount = new AtomicInteger(0);
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
.thenApply(hasNext -> {
if (!hasNext) {
return false;
}
if (maxCount > 0 && maxCount < iterationCount.incrementAndGet()) {
return false;
}
final KeyValue kv = iterator.next();
final UUID otherIndexerId = heartbeatKeyToIndexerId(store, index, kv.getKey());
try {
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
ret.put(otherIndexerId, otherHeartbeat);
} catch (InvalidProtocolBufferException e) {
// Let the caller know about this invalid heartbeat.
ret.put(otherIndexerId, IndexBuildProto.IndexBuildHeartbeat.newBuilder()
.setInfo("<< Invalid Heartbeat >>")
.build());
}
return true;
}))
.thenApply(ignore -> ret);
}

public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index, long minAgenMilliseconds, int maxIteration) {
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
final AtomicInteger deleteCount = new AtomicInteger(0);
final AtomicInteger iterationCount = new AtomicInteger(0);
final long now = nowMilliseconds();
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
.thenApply(hasNext -> {
if (!hasNext) {
return false;
}
if (maxIteration > 0 && maxIteration < iterationCount.incrementAndGet()) {
return false;
}
final KeyValue kv = iterator.next();
boolean shouldRemove;
try {
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
// remove heartbeat if too old
shouldRemove = now + minAgenMilliseconds >= otherHeartbeat.getHeartbeatTimeMilliseconds();
} catch (InvalidProtocolBufferException e) {
// remove heartbeat if invalid
shouldRemove = true;
}
if (shouldRemove) {
store.ensureContextActive().clear(kv.getKey());
deleteCount.incrementAndGet();
}
return true;
}))
.thenApply(ignore -> deleteCount.get());
}

private static AsyncIterator<KeyValue> heartbeatsIterator(FDBRecordStore store, Index index) {
return store.getContext().ensureActive().snapshot().getRange(IndexingSubspaces.indexheartbeatSubspace(store, index).range()).iterator();
}

private static UUID heartbeatKeyToIndexerId(FDBRecordStore store, Index index, byte[] key) {
return IndexingSubspaces.indexheartbeatSubspace(store, index).unpack(key).getUUID(0);
}

private static long nowMilliseconds() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ CompletableFuture<Void> mergeIndex(@Nullable SubspaceProvider subspaceProvider)
// Merge operation may take a long time, hence the runner's context must be a read-only. Ensure that it
// isn't a synchronized one, which may attempt a heartbeat write
// Note: this runAsync will retry according to the runner's "maxAttempts" setting
common.getNonSynchronizedRunner().runAsync(context -> openRecordStore(context)
common.getRunner().runAsync(context -> openRecordStore(context)
.thenCompose(store -> {
mergeStartTime.set(System.nanoTime());
final IndexDeferredMaintenanceControl mergeControl = store.getIndexDeferredMaintenanceControl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.apple.foundationdb.tuple.Tuple;

import javax.annotation.Nonnull;
import java.util.UUID;

/**
* List of subspaces related to the indexing/index-scrubbing processes.
Expand All @@ -40,6 +41,7 @@ public final class IndexingSubspaces {
private static final Object INDEX_SCRUBBED_RECORDS_RANGES_ZERO = 4L;
private static final Object INDEX_SCRUBBED_RECORDS_RANGES = 5L;
private static final Object INDEX_SCRUBBED_INDEX_RANGES = 6L;
private static final Object INDEX_BUILD_HEARTBEAT_PREFIX = 7L;

private IndexingSubspaces() {
throw new IllegalStateException("Utility class");
Expand Down Expand Up @@ -83,6 +85,29 @@ public static Subspace indexBuildTypeSubspace(@Nonnull FDBRecordStoreBase<?> sto
return indexBuildSubspace(store, index, INDEX_BUILD_TYPE_VERSION);
}

/**
* Subspace that stores the indexing heartbeat.
* @param store store
* @param index index
* @return subspace
*/
@Nonnull
public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index) {
return indexBuildSubspace(store, index, INDEX_BUILD_HEARTBEAT_PREFIX);
}

/**
* Subspace that stores the indexing heartbeat.
* @param store store
* @param index index
* @param indexerId session id
* @return subspace
*/
@Nonnull
public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index, @Nonnull UUID indexerId) {
return indexheartbeatSubspace(store, index).subspace(Tuple.from(indexerId));
}

/**
* Subspace that stores scrubbed records ranges of the zero range-id. This subspace is backward compatible
* to record ranges scrubbed before range-id was introduced.
Expand Down Expand Up @@ -184,5 +209,7 @@ public static void eraseAllIndexingDataButTheLock(@Nonnull FDBRecordContext cont
eraseAllIndexingScrubbingData(context, store, index);
context.clear(Range.startsWith(indexBuildScannedRecordsSubspace(store, index).pack()));
context.clear(Range.startsWith(indexBuildTypeSubspace(store, index).pack()));
// The heartbeats, unlike the sync lock, may be erased here. If needed, an appropriate heartbeat will be set after this clear & within the same transaction.
context.clear(Range.startsWith(indexheartbeatSubspace(store, index).pack()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordMetaData;
import com.apple.foundationdb.record.RecordMetaDataProvider;
Expand Down Expand Up @@ -715,15 +716,18 @@ public B setTransactionTimeLimitMilliseconds(long timeLimitMilliseconds) {
* @see SynchronizedSessionRunner
* @param useSynchronizedSession use synchronize session if true, otherwise false
* @return this builder
*
* @deprecated Synchronized sessions are now determined by the indexing method.
*/
@API(API.Status.DEPRECATED)
@SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP
@Deprecated(since = "4.4.3.0", forRemoval = true)
public B setUseSynchronizedSession(boolean useSynchronizedSession) {
configBuilder.setUseSynchronizedSession(useSynchronizedSession);
return self();
}

/**
* Set the lease length in milliseconds if the synchronized session is used. The default value is {@link OnlineIndexOperationConfig#DEFAULT_LEASE_LENGTH_MILLIS}.
* @see #setUseSynchronizedSession(boolean)
* @see com.apple.foundationdb.synchronizedsession.SynchronizedSession
* @param leaseLengthMillis length between last access and lease's end time in milliseconds
* @return this builder
Expand Down
Loading