Skip to content

Commit e71bbd3

Browse files
committed
Add 1st test - low level
1 parent e023ee7 commit e71bbd3

File tree

5 files changed

+113
-12
lines changed

5 files changed

+113
-12
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ boolean performIndexingStampOperation(@Nonnull ConcurrentHashMap<String, IndexBu
11021102
return true;
11031103
}
11041104

1105-
public CompletableFuture<Map<UUID, IndexBuildProto.IndexingHeartbeat>> getIndexingHeartbeats(int maxCount) {
1105+
public CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getIndexingHeartbeats(int maxCount) {
11061106
return getRunner().runAsync(context -> openRecordStore(context)
11071107
.thenCompose(store -> IndexingHeartbeat.getIndexingHeartbeats(store, common.getPrimaryIndex(), maxCount)));
11081108
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public IndexingHeartbeat(final UUID sessionId, IndexBuildProto.IndexBuildIndexin
5353

5454
public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
5555
byte[] key = IndexingSubspaces.indexheartbeatSubspace(store, index, sessionId).pack();
56-
byte[] value = IndexBuildProto.IndexingHeartbeat.newBuilder()
56+
byte[] value = IndexBuildProto.IndexBuildHeartbeat.newBuilder()
5757
.setMethod(indexingMethod)
5858
.setGenesisTimeMilliseconds(genesisTimeMilliseconds)
5959
.setHeartbeatTimeMilliseconds(nowMilliseconds())
@@ -95,7 +95,7 @@ private void validateNonCompetingHeartbeat(KeyValue kv, long now) {
9595
final UUID otherSessionId = keyTuple.getUUID(keyTuple.size() - 1);
9696
if (!otherSessionId.equals(this.sessionId)) {
9797
try {
98-
final IndexBuildProto.IndexingHeartbeat otherHeartbeat = IndexBuildProto.IndexingHeartbeat.parseFrom(kv.getValue());
98+
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
9999
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
100100
if (age > 0 && age < leaseLength) {
101101
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress")
@@ -114,8 +114,8 @@ public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index)
114114
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index, sessionId).pack());
115115
}
116116

117-
public static CompletableFuture<Map<UUID, IndexBuildProto.IndexingHeartbeat>> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) {
118-
final Map<UUID, IndexBuildProto.IndexingHeartbeat> ret = new HashMap<>();
117+
public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) {
118+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> ret = new HashMap<>();
119119
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
120120
final AtomicInteger iterationCount = new AtomicInteger(0);
121121
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
@@ -133,11 +133,11 @@ public static CompletableFuture<Map<UUID, IndexBuildProto.IndexingHeartbeat>> ge
133133
}
134134
final UUID otherSessionId = keyTuple.getUUID(keyTuple.size() - 1);
135135
try {
136-
final IndexBuildProto.IndexingHeartbeat otherHeartbeat = IndexBuildProto.IndexingHeartbeat.parseFrom(kv.getValue());
136+
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
137137
ret.put(otherSessionId, otherHeartbeat);
138138
} catch (InvalidProtocolBufferException e) {
139139
// put a NONE heartbeat to indicate an invalid item
140-
ret.put(otherSessionId, IndexBuildProto.IndexingHeartbeat.newBuilder()
140+
ret.put(otherSessionId, IndexBuildProto.IndexBuildHeartbeat.newBuilder()
141141
.setMethod(IndexBuildProto.IndexBuildIndexingStamp.Method.NONE)
142142
.build());
143143
}
@@ -162,9 +162,9 @@ public static CompletableFuture<Integer> clearIndexingHeartbeats(@Nonnull FDBRec
162162
final KeyValue kv = iterator.next();
163163
boolean shouldRemove;
164164
try {
165-
final IndexBuildProto.IndexingHeartbeat otherHeartbeat = IndexBuildProto.IndexingHeartbeat.parseFrom(kv.getValue());
165+
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
166166
// remove heartbeat if too old
167-
shouldRemove = now + minAgenMilliseconds <= otherHeartbeat.getHeartbeatTimeMilliseconds();
167+
shouldRemove = now + minAgenMilliseconds >= otherHeartbeat.getHeartbeatTimeMilliseconds();
168168
} catch (InvalidProtocolBufferException e) {
169169
// remove heartbeat if invalid
170170
shouldRemove = true;

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -600,10 +600,10 @@ private Map<String, IndexBuildProto.IndexBuildIndexingStamp> indexingStamp(@Null
600600
/**
601601
* Get the current indexing heartbeats for a given index (single target or primary index).
602602
* @param maxCount safety valve to limit number items to read. Typically set to zero to keep unlimited.
603-
* @return map of session ids to {@link IndexBuildProto.IndexingHeartbeat}
603+
* @return map of session ids to {@link IndexBuildProto.IndexBuildHeartbeat}
604604
*/
605605
@API(API.Status.EXPERIMENTAL)
606-
public Map<UUID, IndexBuildProto.IndexingHeartbeat> getIndexingHeartbeats(int maxCount) {
606+
public Map<UUID, IndexBuildProto.IndexBuildHeartbeat> getIndexingHeartbeats(int maxCount) {
607607
return asyncToSync(FDBStoreTimer.Waits.WAIT_INDEX_READ_HEARTBEATS,
608608
getIndexer().getIndexingHeartbeats(maxCount));
609609
}

fdb-record-layer-core/src/main/proto/index_build.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ message IndexBuildIndexingStamp {
4747
optional string blockID = 7; // optional, a short string that describes the reason for the block.
4848
}
4949

50-
message IndexingHeartbeat {
50+
message IndexBuildHeartbeat {
5151
required IndexBuildIndexingStamp.Method method = 1;
5252
required int64 genesisTimeMilliseconds = 2;
5353
required int64 heartbeatTimeMilliseconds = 3;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* OnlineIndexingHeartbeatTest.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record.provider.foundationdb;
22+
23+
import com.apple.foundationdb.record.IndexBuildProto;
24+
import com.apple.foundationdb.record.metadata.Index;
25+
import com.apple.foundationdb.record.metadata.IndexOptions;
26+
import com.apple.foundationdb.record.metadata.IndexTypes;
27+
import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression;
28+
import org.assertj.core.api.Assertions;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.UUID;
36+
import java.util.stream.Collectors;
37+
38+
import static com.apple.foundationdb.record.metadata.Key.Expressions.field;
39+
40+
/**
41+
* Verify indexing heartbeat activity (query & clear).
42+
*/
43+
public class OnlineIndexingHeartbeatTest extends OnlineIndexerTest {
44+
@Test
45+
void testHeartbeatLowLevel() {
46+
List<Index> indexes = new ArrayList<>();
47+
indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS));
48+
indexes.add(new Index("indexB", field("num_value_3_indexed"), IndexTypes.VALUE));
49+
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes);
50+
51+
final int count = 10;
52+
IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count];
53+
for (int i = 0; i < count; i++) {
54+
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_INDEX, 100 + i);
55+
}
56+
57+
openSimpleMetaData(hook);
58+
try (FDBRecordContext context = openContext()) {
59+
for (var heartbeat : heartbeats) {
60+
heartbeat.updateHeartbeat(recordStore, indexes.get(0));
61+
heartbeat.updateHeartbeat(recordStore, indexes.get(1));
62+
}
63+
context.commit();
64+
}
65+
66+
// Verify query/clear operation
67+
try (OnlineIndexer indexer = newIndexerBuilder(indexes.get(0)).build()) {
68+
// Query, unlimited
69+
Map<UUID, IndexBuildProto.IndexBuildHeartbeat> queried = indexer.getIndexingHeartbeats(0);
70+
Assertions.assertThat(queried).hasSize(count);
71+
Assertions.assertThat(queried.keySet())
72+
.containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(ht -> ht.sessionId).collect(Collectors.toList()));
73+
74+
// Query, partial
75+
queried = indexer.getIndexingHeartbeats(5);
76+
Assertions.assertThat(queried).hasSize(5);
77+
78+
// clear, partial
79+
int countDeleted = indexer.clearIndexingHeartbeats(0, 7);
80+
Assertions.assertThat(countDeleted).isEqualTo(7);
81+
queried = indexer.getIndexingHeartbeats(5);
82+
Assertions.assertThat(queried).hasSize(3);
83+
}
84+
85+
// Verify that the previous clear does not affect other index
86+
try (OnlineIndexer indexer = newIndexerBuilder(indexes.get(1)).build()) {
87+
Map<UUID, IndexBuildProto.IndexBuildHeartbeat> queried = indexer.getIndexingHeartbeats(100);
88+
Assertions.assertThat(queried).hasSize(count);
89+
Assertions.assertThat(queried.keySet())
90+
.containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(ht -> ht.sessionId).collect(Collectors.toList()));
91+
92+
// clear all
93+
int countDeleted = indexer.clearIndexingHeartbeats(0, 0);
94+
Assertions.assertThat(countDeleted).isEqualTo(count);
95+
96+
// verify empty
97+
queried = indexer.getIndexingHeartbeats(0);
98+
Assertions.assertThat(queried).isEmpty();
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)