Skip to content

Commit 06c9edd

Browse files
committed
Set/clear heartbeats at start/end
1 parent aa7b49e commit 06c9edd

File tree

3 files changed

+66
-26
lines changed

3 files changed

+66
-26
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBStoreTimer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,10 @@ public enum Waits implements Wait {
413413
WAIT_INDEX_OPERATION("wait for index operation"),
414414
/** Wait for indexing type stamp operation. */
415415
WAIT_INDEX_TYPESTAMP_OPERATION("wait for indexing type stamp operation"),
416+
/** Wait for clearing indexing heartbeats. */
417+
WAIT_INDEX_CLEAR_HEARTBEATS("Wait for clearing indexing heartbeats"),
418+
/** Wait for reading indexing heartbeats. */
419+
WAIT_INDEX_READ_HEARTBEATS("Wait for reading indexing heartbeats"),
416420
/** Wait for adding an index. */
417421
WAIT_ADD_INDEX("wait for adding an index"),
418422
/** Wait for dropping an index. */

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -160,19 +160,35 @@ public CompletableFuture<Void> buildIndexAsync(boolean markReadable, boolean use
160160
lastProgressSnapshot = StoreTimerSnapshot.from(timer);
161161
}
162162
message.addKeyAndValue(LogMessageKeys.SESSION_ID, common.getUuid());
163-
return handleStateAndDoBuildIndexAsync(markReadable, message).whenComplete((vignore, ex) -> {
164-
message.addKeysAndValues(indexingLogMessageKeyValues()) // add these here to pick up state accumulated during build
165-
.addKeysAndValues(common.indexLogMessageKeyValues())
166-
.addKeyAndValue(LogMessageKeys.TOTAL_MICROS, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos));
167-
if (LOGGER.isWarnEnabled() && (ex != null)) {
168-
message.addKeyAndValue(LogMessageKeys.RESULT, "failure");
169-
message.addKeysAndValues(throttle.logMessageKeyValues()); // this "last attempt" snapshot information can help debugging
170-
LOGGER.warn(message.toString(), ex);
171-
} else if (LOGGER.isInfoEnabled()) {
172-
message.addKeyAndValue(LogMessageKeys.RESULT, "success");
173-
LOGGER.info(message.toString());
174-
}
175-
});
163+
AtomicReference<Throwable> indexingException = new AtomicReference<>(null);
164+
return handleStateAndDoBuildIndexAsync(markReadable, message)
165+
.handle((ret, ex) -> {
166+
if (ex != null) {
167+
indexingException.set(ex);
168+
}
169+
message.addKeysAndValues(indexingLogMessageKeyValues()) // add these here to pick up state accumulated during build
170+
.addKeysAndValues(common.indexLogMessageKeyValues())
171+
.addKeyAndValue(LogMessageKeys.TOTAL_MICROS, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos));
172+
if (LOGGER.isWarnEnabled() && (ex != null)) {
173+
message.addKeyAndValue(LogMessageKeys.RESULT, "failure");
174+
message.addKeysAndValues(throttle.logMessageKeyValues()); // this "last attempt" snapshot information can help debugging
175+
LOGGER.warn(message.toString(), ex);
176+
} else if (LOGGER.isInfoEnabled()) {
177+
message.addKeyAndValue(LogMessageKeys.RESULT, "success");
178+
LOGGER.info(message.toString());
179+
}
180+
return ret;
181+
})
182+
.thenCompose(ignore -> clearHeartbeats())
183+
.handle((ignore, exIgnore) -> {
184+
Throwable ex = indexingException.get();
185+
if (ex instanceof RuntimeException) {
186+
throw (RuntimeException) ex;
187+
} else if (ex != null) {
188+
throw new RuntimeException(ex);
189+
}
190+
return null;
191+
});
176192
}
177193

178194
abstract List<Object> indexingLogMessageKeyValues();
@@ -266,7 +282,7 @@ private CompletableFuture<Void> markIndexesWriteOnly(boolean continueBuild, FDBR
266282
@Nonnull
267283
public CompletableFuture<Boolean> markReadableIfBuilt() {
268284
AtomicBoolean allReadable = new AtomicBoolean(true);
269-
return common.getRunner().runAsync(context -> openRecordStore(context).thenCompose(store ->
285+
return getRunner().runAsync(context -> openRecordStore(context).thenCompose(store ->
270286
forEachTargetIndex(index -> {
271287
if (store.isIndexReadable(index)) {
272288
return AsyncUtil.DONE;
@@ -313,12 +329,9 @@ public CompletableFuture<Boolean> markIndexReadable(boolean markReadablePlease)
313329
private CompletableFuture<Boolean> markIndexReadableSingleTarget(Index index, AtomicBoolean anythingChanged,
314330
AtomicReference<RuntimeException> runtimeExceptionAtomicReference) {
315331
// An extension function to reduce markIndexReadable's complexity
316-
return common.getRunner().runAsync(context ->
332+
return getRunner().runAsync(context ->
317333
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
318334
.thenCompose(store -> {
319-
if (heartbeat != null) {
320-
heartbeat.clearHeartbeat(store, index);
321-
}
322335
return policy.shouldAllowUniquePendingState(store) ?
323336
store.markIndexReadableOrUniquePending(index) :
324337
store.markIndexReadable(index);
@@ -352,12 +365,13 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
352365

353366
@Nonnull
354367
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild, Index index, IndexBuildProto.IndexBuildIndexingStamp newStamp) {
368+
final CompletableFuture<Void> checkUpdateHeartbeat = updateHeartbeat(true, store, index);
355369
if (forceStampOverwrite && !continuedBuild) {
356370
// Fresh session + overwrite = no questions asked
357371
store.saveIndexingTypeStamp(index, newStamp);
358-
return AsyncUtil.DONE;
372+
return checkUpdateHeartbeat;
359373
}
360-
return store.loadIndexingTypeStampAsync(index)
374+
return checkUpdateHeartbeat.thenCompose(ignore -> store.loadIndexingTypeStampAsync(index)
361375
.thenCompose(savedStamp -> {
362376
if (savedStamp == null) {
363377
if (continuedBuild && newStamp.getMethod() !=
@@ -399,7 +413,7 @@ private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boo
399413
}
400414
// fall down to exception
401415
throw newPartlyBuiltException(continuedBuild, savedStamp, newStamp, index);
402-
});
416+
}));
403417
}
404418

405419
private boolean shouldAllowTypeConversionContinue(IndexBuildProto.IndexBuildIndexingStamp newStamp, IndexBuildProto.IndexBuildIndexingStamp savedStamp) {
@@ -521,7 +535,7 @@ protected CompletableFuture<Boolean> doneOrThrottleDelayAndMaybeLogProgress(bool
521535

522536
validateTimeLimit(toWait);
523537

524-
CompletableFuture<Boolean> delay = MoreAsyncUtil.delayedFuture(toWait, TimeUnit.MILLISECONDS, common.getRunner().getScheduledExecutor()).thenApply(vignore3 -> true);
538+
CompletableFuture<Boolean> delay = MoreAsyncUtil.delayedFuture(toWait, TimeUnit.MILLISECONDS, getRunner().getScheduledExecutor()).thenApply(vignore3 -> true);
525539
if (getRunner().getTimer() != null) {
526540
delay = getRunner().getTimer().instrument(FDBStoreTimer.Events.INDEXER_DELAY, delay, getRunner().getExecutor());
527541
}
@@ -835,6 +849,25 @@ private CompletableFuture<Void> updateHeartbeat(boolean validate, FDBRecordStore
835849
return AsyncUtil.DONE;
836850
}
837851

852+
private CompletableFuture<Void> clearHeartbeats() {
853+
// Here: if the heartbeat was *not* cleared while marking the index readable, it would be cleared in
854+
// these dedicated transaction. Heartbeat clearing is not a blocker but a "best effort" operation.
855+
if (heartbeat == null) {
856+
return AsyncUtil.DONE;
857+
}
858+
return forEachTargetIndex(this::clearHeartbeatSingleTarget);
859+
}
860+
861+
private CompletableFuture<Void> clearHeartbeatSingleTarget(Index index) {
862+
return getRunner().runAsync(context ->
863+
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
864+
.thenApply(store -> {
865+
heartbeat.clearHeartbeat(store, index);
866+
return null;
867+
}));
868+
}
869+
870+
838871
private boolean shouldValidate() {
839872
final long minimalInterval = policy.getCheckIndexingMethodFrequencyMilliseconds();
840873
if (minimalInterval < 0 || isScrubber) {

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,17 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
8484
}
8585

8686
private void validateNonCompetingHeartbeat(KeyValue kv) {
87-
final Tuple keyTuple = Tuple.from(kv.getKey());
87+
final Tuple keyTuple = Tuple.fromBytes(kv.getKey());
88+
if (keyTuple.size() < 2) { // expecting 8
89+
return;
90+
}
8891
final UUID otherSessionId = keyTuple.getUUID(keyTuple.size() - 1);
8992
final long now = nowMilliseconds();
9093
if (!otherSessionId.equals(this.sessionId)) {
9194
try {
92-
final IndexBuildProto.IndexingHeartbeat otherHearbeat = IndexBuildProto.IndexingHeartbeat.parseFrom(kv.getValue());
93-
final long age = otherHearbeat.getHeartbeatTimeMilliseconds() - now;
94-
if (age < TimeUnit.SECONDS.toMillis(10)) {
95+
final IndexBuildProto.IndexingHeartbeat otherHeartbeat = IndexBuildProto.IndexingHeartbeat.parseFrom(kv.getValue());
96+
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
97+
if (age > 0 && age < TimeUnit.SECONDS.toMillis(10)) {
9598
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress");
9699
// TODO: log details
97100
}

0 commit comments

Comments
 (0)