Skip to content

Commit f05a119

Browse files
committed
Some cleanup
1 parent 1ceda45 commit f05a119

File tree

6 files changed

+29
-22
lines changed

6 files changed

+29
-22
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ public enum LogMessageKeys {
163163
RECORDS_PER_SECOND,
164164
DOCUMENT,
165165
SESSION_ID,
166-
EXISTING_SESSION_ID,
167-
INDEXER_SESSION_ID,
166+
EXISTING_INDEXER_ID,
168167
INDEXER_ID,
169168
INDEX_STATE_PRECONDITION,
170169
INITIAL_INDEX_STATE,

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ public CompletableFuture<Boolean> markReadableIfBuilt() {
305305
).thenApply(ignore -> allReadable.get()), common.indexLogMessageKeyValues("IndexingBase::markReadableIfBuilt"));
306306
}
307307

308-
309308
@Nonnull
310309
public CompletableFuture<Boolean> markIndexReadable(boolean markReadablePlease) {
311310
if (!markReadablePlease) {
@@ -861,6 +860,14 @@ private CompletableFuture<Void> clearHeartbeats() {
861860
.thenAccept(ignore -> heartbeat = null);
862861
}
863862

863+
private void clearHeartbeats(FDBRecordStore store) {
864+
if (heartbeat != null) {
865+
for (Index index : common.getTargetIndexes()) {
866+
clearHeartbeatSingleTarget(store, index);
867+
}
868+
}
869+
}
870+
864871
private CompletableFuture<Void> clearHeartbeatSingleTarget(Index index) {
865872
return getRunner().runAsync(context ->
866873
common.getRecordStoreBuilder().copyBuilder().setContext(context).openAsync()
@@ -1027,11 +1034,8 @@ public CompletableFuture<Void> rebuildIndexAsync(@Nonnull FDBRecordStore store)
10271034
}))
10281035
.thenCompose(vignore -> setIndexingTypeOrThrow(store, false))
10291036
.thenCompose(vignore -> rebuildIndexInternalAsync(store))
1030-
.whenComplete((ignore, ignoreEx) -> {
1031-
for (Index index: common.getTargetIndexes()) {
1032-
clearHeartbeatSingleTarget(store, index);
1033-
}
1034-
});
1037+
// If any of the indexes' heartbeats, for any reason, was not cleared during "mark readable", clear it here
1038+
.whenComplete((ignore, ignoreEx) -> clearHeartbeats(store));
10351039
}
10361040

10371041
abstract CompletableFuture<Void> rebuildIndexInternalAsync(FDBRecordStore store);

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
6666
switch (indexingMethod) {
6767
case SCRUB_REPAIR:
6868
case MUTUAL_BY_RECORDS:
69+
updateHeartbeat(store, index);
6970
return AsyncUtil.DONE;
7071

7172
case BY_RECORDS:
@@ -79,7 +80,11 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
7980
}
8081
validateNonCompetingHeartbeat(iterator.next(), nowMilliseconds());
8182
return true;
82-
}));
83+
}))
84+
.thenApply(ignore -> {
85+
updateHeartbeat(store, index);
86+
return null;
87+
});
8388

8489
default:
8590
throw new IndexingBase.ValidationException("invalid indexing method",
@@ -92,15 +97,15 @@ private void validateNonCompetingHeartbeat(KeyValue kv, long now) {
9297
if (keyTuple.size() < 2) { // expecting 8
9398
return;
9499
}
95-
final UUID otherSessionId = keyTuple.getUUID(keyTuple.size() - 1);
96-
if (!otherSessionId.equals(this.indexerId)) {
100+
final UUID otherIndexerId = keyTuple.getUUID(keyTuple.size() - 1);
101+
if (!otherIndexerId.equals(this.indexerId)) {
97102
try {
98103
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
99104
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
100105
if (age > 0 && age < leaseLength) {
101106
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress")
102-
.addLogInfo(LogMessageKeys.SESSION_ID, indexerId)
103-
.addLogInfo(LogMessageKeys.EXISTING_SESSION_ID, otherSessionId)
107+
.addLogInfo(LogMessageKeys.INDEXER_ID, indexerId)
108+
.addLogInfo(LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId)
104109
.addLogInfo(LogMessageKeys.AGE_MILLISECONDS, age)
105110
.addLogInfo(LogMessageKeys.TIME_LIMIT_MILLIS, leaseLength);
106111
}
@@ -131,13 +136,13 @@ public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>>
131136
if (keyTuple.size() < 2) { // expecting 8
132137
return true; // ignore, next
133138
}
134-
final UUID otherSessionId = keyTuple.getUUID(keyTuple.size() - 1);
139+
final UUID otherIndexerId = keyTuple.getUUID(keyTuple.size() - 1);
135140
try {
136141
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
137-
ret.put(otherSessionId, otherHeartbeat);
142+
ret.put(otherIndexerId, otherHeartbeat);
138143
} catch (InvalidProtocolBufferException e) {
139144
// put a NONE heartbeat to indicate an invalid item
140-
ret.put(otherSessionId, IndexBuildProto.IndexBuildHeartbeat.newBuilder()
145+
ret.put(otherIndexerId, IndexBuildProto.IndexBuildHeartbeat.newBuilder()
141146
.setMethod(IndexBuildProto.IndexBuildIndexingStamp.Method.NONE)
142147
.build());
143148
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingSubspaces.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,12 @@ public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> sto
100100
* Subspace that stores the indexing heartbeat.
101101
* @param store store
102102
* @param index index
103-
* @param sessionId session id
103+
* @param indexerId session id
104104
* @return subspace
105105
*/
106106
@Nonnull
107-
public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index, @Nonnull UUID sessionId) {
108-
return indexheartbeatSubspace(store, index).subspace(Tuple.from(sessionId));
107+
public static Subspace indexheartbeatSubspace(@Nonnull FDBRecordStoreBase<?> store, @Nonnull Index index, @Nonnull UUID indexerId) {
108+
return indexheartbeatSubspace(store, index).subspace(Tuple.from(indexerId));
109109
}
110110

111111
/**
@@ -209,7 +209,7 @@ public static void eraseAllIndexingDataButTheLock(@Nonnull FDBRecordContext cont
209209
eraseAllIndexingScrubbingData(context, store, index);
210210
context.clear(Range.startsWith(indexBuildScannedRecordsSubspace(store, index).pack()));
211211
context.clear(Range.startsWith(indexBuildTypeSubspace(store, index).pack()));
212-
// The heartbeats, unlike the sync lock, may be erased here. If needed, an appropriate heartbeat will be set after the clearing within the same transaction.
212+
// The heartbeats, unlike the sync lock, may be erased here. If needed, an appropriate heartbeat will be set after this clear & within the same transaction.
213213
context.clear(Range.startsWith(indexheartbeatSubspace(store, index).pack()));
214214
}
215215
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1267,7 +1267,7 @@ public static class Builder {
12671267
private DesiredAction ifReadable = DesiredAction.CONTINUE;
12681268
private boolean doAllowUniquePendingState = false;
12691269
private Set<TakeoverTypes> allowedTakeoverSet = null;
1270-
private long checkIndexingStampFrequency = 30_000;
1270+
private long checkIndexingStampFrequency = 10_000;
12711271
private boolean useMutualIndexing = false;
12721272
private List<Tuple> useMutualIndexingBoundaries = null;
12731273
private boolean allowUnblock = false;

fdb-record-layer-core/src/main/proto/index_build.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ message IndexBuildIndexingStamp {
3636
MUTUAL_BY_RECORDS = 5; // scan records, build multiple target indexes, while allowing multiple indexer processes
3737
NONE = 6; // return this stamp in query when null - never to be written in the DB
3838
};
39-
4039
optional Method method = 1;
4140
optional bytes source_index_subspace_key = 2; // relevant only with BY_INDEX method
4241
optional int32 source_index_last_modified_version = 3; // only with BY_INDEX method

0 commit comments

Comments
 (0)