Skip to content

Commit 4cf86f4

Browse files
committed
wip
1 parent 3885faa commit 4cf86f4

File tree

6 files changed

+71
-35
lines changed

6 files changed

+71
-35
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4966,6 +4966,7 @@ public void removeFormerIndex(FormerIndex formerIndex) {
49664966

49674967
private void clearReadableIndexBuildData(Index index) {
49684968
IndexingRangeSet.forIndexBuild(this, index).clear();
4969+
IndexingHeartbeat.clearAllHeartbeats(this, index);
49694970
}
49704971

49714972
@SuppressWarnings("PMD.CloseResource")

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,11 @@ public void enforceStampOverwrite() {
361361
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) {
362362
// continuedBuild is set if this session isn't a continuation of a previous indexing
363363
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store);
364-
heartbeat = new IndexingHeartbeat(common.getIndexerId(), indexingTypeStamp.getMethod(), common.config.getLeaseLengthMillis());
364+
final IndexBuildProto.IndexBuildIndexingStamp.Method method = indexingTypeStamp.getMethod();
365+
boolean allowMutual =
366+
method == IndexBuildProto.IndexBuildIndexingStamp.Method.MUTUAL_BY_RECORDS ||
367+
method == IndexBuildProto.IndexBuildIndexingStamp.Method.SCRUB_REPAIR;
368+
heartbeat = new IndexingHeartbeat(common.getIndexerId(), indexingTypeStamp.getMethod().toString(), common.config.getLeaseLengthMillis(), allowMutual);
365369

366370
return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp)
367371
.thenCompose(ignore -> updateHeartbeat(true, store, index)));

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,23 @@
3939
public class IndexingHeartbeat {
4040
// [prefix, indexerId] -> [indexing-type, genesis time, heartbeat time]
4141
final UUID indexerId;
42-
final IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod;
42+
final String info;
4343
final long genesisTimeMilliseconds;
4444
final long leaseLength;
45+
final boolean allowMutual;
4546

46-
public IndexingHeartbeat(final UUID indexerId, IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod, long leaseLength) {
47+
public IndexingHeartbeat(final UUID indexerId, String info, long leaseLength, boolean allowMutual) {
4748
this.indexerId = indexerId;
48-
this.indexingMethod = indexingMethod;
49+
this.info = info;
4950
this.leaseLength = leaseLength;
51+
this.allowMutual = allowMutual;
5052
this.genesisTimeMilliseconds = nowMilliseconds();
5153
}
5254

5355
public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
5456
byte[] key = IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack();
5557
byte[] value = IndexBuildProto.IndexBuildHeartbeat.newBuilder()
56-
.setMethod(indexingMethod)
58+
.setInfo(info)
5759
.setGenesisTimeMilliseconds(genesisTimeMilliseconds)
5860
.setHeartbeatTimeMilliseconds(nowMilliseconds())
5961
.build().toByteArray();
@@ -62,18 +64,14 @@ public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index)
6264

6365
public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
6466
// complete exceptionally if non-mutual, other exists
65-
switch (indexingMethod) {
66-
case SCRUB_REPAIR:
67-
case MUTUAL_BY_RECORDS:
68-
updateHeartbeat(store, index);
69-
return AsyncUtil.DONE;
67+
if (allowMutual) {
68+
updateHeartbeat(store, index);
69+
return AsyncUtil.DONE;
70+
}
7071

71-
case BY_RECORDS:
72-
case MULTI_TARGET_BY_RECORDS:
73-
case BY_INDEX:
74-
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
75-
final long now = nowMilliseconds();
76-
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
72+
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
73+
final long now = nowMilliseconds();
74+
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
7775
.thenApply(hasNext -> {
7876
if (!hasNext) {
7977
return false;
@@ -98,21 +96,20 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
9896
}
9997
return true;
10098
}))
101-
.thenApply(ignore -> {
102-
updateHeartbeat(store, index);
103-
return null;
104-
});
105-
106-
default:
107-
throw new IndexingBase.ValidationException("invalid indexing method",
108-
LogMessageKeys.INDEXING_METHOD, indexingMethod);
109-
}
99+
.thenApply(ignore -> {
100+
updateHeartbeat(store, index);
101+
return null;
102+
});
110103
}
111104

112105
public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
113106
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack());
114107
}
115108

109+
public static void clearAllHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index) {
110+
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index).range());
111+
}
112+
116113
public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) {
117114
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> ret = new HashMap<>();
118115
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
@@ -131,9 +128,9 @@ public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>>
131128
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
132129
ret.put(otherIndexerId, otherHeartbeat);
133130
} catch (InvalidProtocolBufferException e) {
134-
// put a NONE heartbeat to indicate an invalid item
131+
// Let the caller know about this invalid heartbeat.
135132
ret.put(otherIndexerId, IndexBuildProto.IndexBuildHeartbeat.newBuilder()
136-
.setMethod(IndexBuildProto.IndexBuildIndexingStamp.Method.NONE)
133+
.setInfo("<< Invalid Heartbeat >>")
137134
.build());
138135
}
139136
return true;

fdb-record-layer-core/src/main/proto/index_build.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ message IndexBuildIndexingStamp {
4747
}
4848

4949
message IndexBuildHeartbeat {
50-
required IndexBuildIndexingStamp.Method method = 1;
50+
required string info = 1;
5151
required int64 genesisTimeMilliseconds = 2;
5252
required int64 heartbeatTimeMilliseconds = 3;
5353
}

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeatLowLevelTest.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ void testHeartbeatLowLevel(List<Index> indexes) {
126126
final int count = 10;
127127
IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count];
128128
for (int i = 0; i < count; i++) {
129-
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_INDEX, 100 + i);
129+
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), "test", 100 + i, false);
130130
}
131131

132132
// populate heartbeats
@@ -216,7 +216,7 @@ void testHeartbeatLowLevelVersionIndexes() {
216216
void testCheckAndUpdateByRecords() {
217217
Index index = new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT);
218218
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index));
219-
IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, TimeUnit.SECONDS.toMillis(30));
219+
IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), "Test", TimeUnit.SECONDS.toMillis(30), false);
220220

221221
// Successfully update heartbeat
222222
openSimpleMetaData(hook);
@@ -234,7 +234,7 @@ void testCheckAndUpdateByRecords() {
234234
context.commit();
235235
}
236236

237-
IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, TimeUnit.SECONDS.toMillis(30));
237+
IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), "Test", TimeUnit.SECONDS.toMillis(30), false);
238238
Assertions.assertThat(heartbeat1.indexerId).isNotEqualTo(heartbeat2.indexerId);
239239
// Fail to create another 'BY_RECORD` heartbeat
240240
openSimpleMetaData(hook);
@@ -280,7 +280,7 @@ void testCheckAndUpdateMutual() {
280280
final int count = 10;
281281
IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count];
282282
for (int i = 0; i < count; i++) {
283-
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.MUTUAL_BY_RECORDS, TimeUnit.SECONDS.toMillis(100));
283+
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), "Mutual", TimeUnit.SECONDS.toMillis(100), true);
284284
}
285285

286286
// Successfully check//update all heartbeats
@@ -317,7 +317,7 @@ void testCheckAndUpdateMutual() {
317317
void testExpiredHeartbeat() throws InterruptedException {
318318
Index index = new Index("versionIndex1", concat(field("num_value_2"), VersionKeyExpression.VERSION), IndexTypes.VERSION);
319319
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index));
320-
IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, TimeUnit.SECONDS.toMillis(10));
320+
IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), "Test", TimeUnit.SECONDS.toMillis(10), false);
321321

322322
// Successfully update heartbeat1
323323
openSimpleMetaData(hook);
@@ -328,7 +328,7 @@ void testExpiredHeartbeat() throws InterruptedException {
328328

329329
// Delay 20, set heartbeat2's lease to 4
330330
Thread.sleep(20);
331-
IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, 4);
331+
IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), "Test", 4, false);
332332
Assertions.assertThat(heartbeat1.indexerId).isNotEqualTo(heartbeat2.indexerId);
333333

334334
// heartbeat2 successfully takes over
@@ -340,4 +340,37 @@ void testExpiredHeartbeat() throws InterruptedException {
340340
context.commit();
341341
}
342342
}
343+
344+
@Test
345+
void testHeartbeatExpiration() throws InterruptedException {
346+
Index index = new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT);
347+
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index));
348+
349+
final IndexingHeartbeat heartbeatA = new IndexingHeartbeat(UUID.randomUUID(), "a", 500, false);
350+
final IndexingHeartbeat heartbeatB = new IndexingHeartbeat(UUID.randomUUID(), "b", 5, false);
351+
352+
// Set heartbeat A
353+
openSimpleMetaData(hook);
354+
try (FDBRecordContext context = openContext()) {
355+
heartbeatA.checkAndUpdateHeartbeat(recordStore, index).join();
356+
context.commit();
357+
}
358+
359+
Thread.sleep(100);
360+
// Expect heartbeatA to expire after 5 milliseconds, and successfully set heartbeatB
361+
openSimpleMetaData(hook);
362+
try (FDBRecordContext context = openContext()) {
363+
heartbeatB.checkAndUpdateHeartbeat(recordStore, index).join();
364+
context.commit();
365+
}
366+
367+
// Expect heartbeatA to fail check/update
368+
// Note: if become flakey, increase the least time of heartbeatA
369+
openSimpleMetaData(hook);
370+
try (FDBRecordContext context = openContext()) {
371+
final CompletionException ex = assertThrows(CompletionException.class, () -> heartbeatA.checkAndUpdateHeartbeat(recordStore, index).join());
372+
Assertions.assertThat(ex.getMessage()).contains("SynchronizedSessionLockedException");
373+
context.commit();
374+
}
375+
}
343376
}

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexingHeartbeatTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ void testHeartbeatLowLevel() {
6363
final int count = 10;
6464
IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count];
6565
for (int i = 0; i < count; i++) {
66-
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_INDEX, 100 + i);
66+
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), "Test", 100 + i, true);
6767
}
6868

6969
openSimpleMetaData(hook);
@@ -235,6 +235,7 @@ void testMutualIndexersHeartbeatsClearAfterBuild() throws InterruptedException {
235235
if (i == 4) {
236236
try {
237237
startSemaphore.acquire();
238+
Thread.sleep(100);
238239
} catch (InterruptedException e) {
239240
throw new RuntimeException(e);
240241
}

0 commit comments

Comments
 (0)