Skip to content

Commit 3885faa

Browse files
committed
wip
1 parent 4b76a5e commit 3885faa

File tree

3 files changed

+457
-15
lines changed

3 files changed

+457
-15
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import java.util.concurrent.atomic.AtomicInteger;
3838

3939
public class IndexingHeartbeat {
40-
// [prefix, xid] -> [indexing-type, genesis time, heartbeat time]
40+
// [prefix, indexerId] -> [indexing-type, genesis time, heartbeat time]
4141
final UUID indexerId;
4242
final IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod;
4343
final long genesisTimeMilliseconds;
@@ -85,6 +85,7 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
8585
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
8686
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
8787
if (age > 0 && age < leaseLength) {
88+
// For practical reasons, this exception is backward compatible to the Synchronized Lock one
8889
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress")
8990
.addLogInfo(LogMessageKeys.INDEXER_ID, indexerId)
9091
.addLogInfo(LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
/*
2+
* testIndexingHeartbeaLowLevel.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record.provider.foundationdb;
22+
23+
import com.apple.foundationdb.record.IndexBuildProto;
24+
import com.apple.foundationdb.record.RecordMetaData;
25+
import com.apple.foundationdb.record.RecordMetaDataBuilder;
26+
import com.apple.foundationdb.record.TestRecords1Proto;
27+
import com.apple.foundationdb.record.metadata.Index;
28+
import com.apple.foundationdb.record.metadata.IndexOptions;
29+
import com.apple.foundationdb.record.metadata.IndexTypes;
30+
import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression;
31+
import com.apple.foundationdb.record.metadata.expressions.GroupingKeyExpression;
32+
import com.apple.foundationdb.record.metadata.expressions.VersionKeyExpression;
33+
import com.apple.foundationdb.record.provider.foundationdb.keyspace.KeySpacePath;
34+
import com.apple.foundationdb.record.test.FDBDatabaseExtension;
35+
import com.apple.foundationdb.record.test.TestKeySpace;
36+
import com.apple.foundationdb.record.test.TestKeySpacePathManagerExtension;
37+
import com.google.protobuf.Descriptors;
38+
import org.assertj.core.api.Assertions;
39+
import org.junit.jupiter.api.BeforeEach;
40+
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.api.extension.RegisterExtension;
42+
43+
import javax.annotation.Nonnull;
44+
import java.util.ArrayList;
45+
import java.util.Arrays;
46+
import java.util.List;
47+
import java.util.Map;
48+
import java.util.UUID;
49+
import java.util.concurrent.CompletionException;
50+
import java.util.concurrent.TimeUnit;
51+
import java.util.stream.Collectors;
52+
53+
import static com.apple.foundationdb.record.metadata.Key.Expressions.concat;
54+
import static com.apple.foundationdb.record.metadata.Key.Expressions.field;
55+
import static org.junit.jupiter.api.Assertions.assertThrows;
56+
57+
public class IndexingHeartbeatLowLevelTest {
58+
@RegisterExtension
59+
final FDBDatabaseExtension dbExtension = new FDBDatabaseExtension();
60+
@RegisterExtension
61+
final TestKeySpacePathManagerExtension pathManager = new TestKeySpacePathManagerExtension(dbExtension);
62+
FDBDatabase fdb;
63+
KeySpacePath path;
64+
FDBRecordStore recordStore;
65+
RecordMetaData metaData;
66+
67+
@BeforeEach
68+
public void setUp() {
69+
final FDBDatabaseFactory factory = dbExtension.getDatabaseFactory();
70+
factory.setInitialDelayMillis(2L);
71+
factory.setMaxDelayMillis(4L);
72+
factory.setMaxAttempts(100);
73+
74+
fdb = dbExtension.getDatabase();
75+
fdb.setAsyncToSyncTimeout(5, TimeUnit.MINUTES);
76+
path = pathManager.createPath(TestKeySpace.RECORD_STORE);
77+
}
78+
79+
FDBRecordContext openContext() {
80+
FDBRecordContext context = fdb.openContext();
81+
FDBRecordStore.Builder builder = createStoreBuilder()
82+
.setContext(context);
83+
recordStore = builder.createOrOpen(FDBRecordStoreBase.StoreExistenceCheck.NONE);
84+
metaData = recordStore.getRecordMetaData();
85+
return context;
86+
}
87+
88+
@Nonnull
89+
private FDBRecordStore.Builder createStoreBuilder() {
90+
return FDBRecordStore.newBuilder()
91+
.setMetaDataProvider(metaData)
92+
.setKeySpacePath(path);
93+
}
94+
95+
void openMetaData(@Nonnull Descriptors.FileDescriptor descriptor, @Nonnull FDBRecordStoreTestBase.RecordMetaDataHook hook) {
96+
RecordMetaDataBuilder metaDataBuilder = RecordMetaData.newBuilder().setRecords(descriptor);
97+
hook.apply(metaDataBuilder);
98+
metaData = metaDataBuilder.getRecordMetaData();
99+
}
100+
101+
void openMetaData(@Nonnull Descriptors.FileDescriptor descriptor) {
102+
openMetaData(descriptor, (metaDataBuilder) -> {
103+
});
104+
}
105+
106+
void openSimpleMetaData() {
107+
openMetaData(TestRecords1Proto.getDescriptor());
108+
}
109+
110+
void openSimpleMetaData(@Nonnull FDBRecordStoreTestBase.RecordMetaDataHook hook) {
111+
openMetaData(TestRecords1Proto.getDescriptor(), hook);
112+
}
113+
114+
protected static FDBRecordStoreTestBase.RecordMetaDataHook allIndexesHook(List<Index> indexes) {
115+
return metaDataBuilder -> {
116+
for (Index index: indexes) {
117+
metaDataBuilder.addIndex("MySimpleRecord", index);
118+
}
119+
} ;
120+
}
121+
122+
void testHeartbeatLowLevel(List<Index> indexes) {
123+
Assertions.assertThat(indexes).hasSizeGreaterThanOrEqualTo(2);
124+
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(indexes);
125+
126+
final int count = 10;
127+
IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count];
128+
for (int i = 0; i < count; i++) {
129+
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_INDEX, 100 + i);
130+
}
131+
132+
// populate heartbeats
133+
openSimpleMetaData(hook);
134+
try (FDBRecordContext context = openContext()) {
135+
for (var heartbeat : heartbeats) {
136+
heartbeat.updateHeartbeat(recordStore, indexes.get(0));
137+
heartbeat.updateHeartbeat(recordStore, indexes.get(1));
138+
}
139+
context.commit();
140+
}
141+
142+
// Verify query/clear operation
143+
openSimpleMetaData(hook);
144+
Index index = indexes.get(0);
145+
try (FDBRecordContext context = openContext()) {
146+
// Query, unlimited
147+
Map<UUID, IndexBuildProto.IndexBuildHeartbeat> queried =
148+
IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
149+
Assertions.assertThat(queried).hasSize(count);
150+
Assertions.assertThat(queried.keySet())
151+
.containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(heartbeat -> heartbeat.indexerId).collect(Collectors.toList()));
152+
153+
// Query, partial
154+
queried =
155+
IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 5).join();
156+
Assertions.assertThat(queried).hasSize(5);
157+
158+
// clear, partial
159+
int countDeleted =
160+
IndexingHeartbeat.clearIndexingHeartbeats(recordStore, index, 0, 7).join();
161+
Assertions.assertThat(countDeleted).isEqualTo(7);
162+
queried =
163+
IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 5).join();
164+
Assertions.assertThat(queried).hasSize(3);
165+
context.commit();
166+
}
167+
168+
// Verify that the previous clear does not affect other index
169+
openSimpleMetaData(hook);
170+
index = indexes.get(1);
171+
try (FDBRecordContext context = openContext()) {
172+
Map<UUID, IndexBuildProto.IndexBuildHeartbeat> queried =
173+
IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 100).join();
174+
Assertions.assertThat(queried).hasSize(count);
175+
Assertions.assertThat(queried.keySet())
176+
.containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(ht -> ht.indexerId).collect(Collectors.toList()));
177+
178+
// clear all
179+
int countDeleted =
180+
IndexingHeartbeat.clearIndexingHeartbeats(recordStore, index, 0, 0).join();
181+
Assertions.assertThat(countDeleted).isEqualTo(count);
182+
183+
// verify empty
184+
queried =
185+
IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
186+
Assertions.assertThat(queried).isEmpty();
187+
context.commit();
188+
}
189+
}
190+
191+
@Test
192+
void testHeartbeatLowLevelValueIndexes() {
193+
List<Index> indexes = new ArrayList<>();
194+
indexes.add(new Index("indexA", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS));
195+
indexes.add(new Index("indexB", field("num_value_3_indexed"), IndexTypes.VALUE));
196+
testHeartbeatLowLevel(indexes);
197+
}
198+
199+
@Test
200+
void testHeartbeatLowLevelSumCountIndexes() {
201+
List<Index> indexes = new ArrayList<>();
202+
indexes.add(new Index("indexE", field("num_value_3_indexed").ungrouped(), IndexTypes.SUM));
203+
indexes.add(new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT));
204+
testHeartbeatLowLevel(indexes);
205+
}
206+
207+
@Test
208+
void testHeartbeatLowLevelVersionIndexes() {
209+
List<Index> indexes = new ArrayList<>();
210+
indexes.add(new Index("versionIndex1", concat(field("num_value_2"), VersionKeyExpression.VERSION), IndexTypes.VERSION));
211+
indexes.add(new Index("versionIndex2", concat(field("num_value_2"), VersionKeyExpression.VERSION), IndexTypes.VERSION));
212+
testHeartbeatLowLevel(indexes);
213+
}
214+
215+
@Test
216+
void testCheckAndUpdateByRecords() {
217+
Index index = new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT);
218+
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index));
219+
IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, TimeUnit.SECONDS.toMillis(30));
220+
221+
// Successfully update heartbeat
222+
openSimpleMetaData(hook);
223+
try (FDBRecordContext context = openContext()) {
224+
heartbeat1.checkAndUpdateHeartbeat(recordStore, index).join();
225+
context.commit();
226+
}
227+
228+
// Successfully update heartbeat
229+
openSimpleMetaData(hook);
230+
try (FDBRecordContext context = openContext()) {
231+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
232+
Assertions.assertThat(existingHeartbeats).hasSize(1);
233+
heartbeat1.checkAndUpdateHeartbeat(recordStore, index).join();
234+
context.commit();
235+
}
236+
237+
IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, TimeUnit.SECONDS.toMillis(30));
238+
Assertions.assertThat(heartbeat1.indexerId).isNotEqualTo(heartbeat2.indexerId);
239+
// Fail to create another 'BY_RECORD` heartbeat
240+
openSimpleMetaData(hook);
241+
try (FDBRecordContext context = openContext()) {
242+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
243+
Assertions.assertThat(existingHeartbeats).hasSize(1);
244+
final CompletionException ex = assertThrows(CompletionException.class, () -> heartbeat2.checkAndUpdateHeartbeat(recordStore, index).join());
245+
Assertions.assertThat(ex.getMessage()).contains("SynchronizedSessionLockedException");
246+
context.commit();
247+
}
248+
249+
// Successfully clear heartbeat1
250+
openSimpleMetaData(hook);
251+
try (FDBRecordContext context = openContext()) {
252+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
253+
Assertions.assertThat(existingHeartbeats).hasSize(1);
254+
heartbeat1.clearHeartbeat(recordStore, index);
255+
context.commit();
256+
}
257+
258+
// Successfully update heartbeat2
259+
openSimpleMetaData(hook);
260+
try (FDBRecordContext context = openContext()) {
261+
heartbeat2.checkAndUpdateHeartbeat(recordStore, index).join();
262+
context.commit();
263+
}
264+
265+
// Successfully clear heartbeat2
266+
openSimpleMetaData(hook);
267+
try (FDBRecordContext context = openContext()) {
268+
heartbeat2.clearHeartbeat(recordStore, index);
269+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
270+
Assertions.assertThat(existingHeartbeats).isEmpty();
271+
context.commit();
272+
}
273+
}
274+
275+
@Test
276+
void testCheckAndUpdateMutual() {
277+
Index index = new Index("indexD", new GroupingKeyExpression(EmptyKeyExpression.EMPTY, 0), IndexTypes.COUNT);
278+
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index));
279+
280+
final int count = 10;
281+
IndexingHeartbeat[] heartbeats = new IndexingHeartbeat[count];
282+
for (int i = 0; i < count; i++) {
283+
heartbeats[i] = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.MUTUAL_BY_RECORDS, TimeUnit.SECONDS.toMillis(100));
284+
}
285+
286+
// Successfully check//update all heartbeats
287+
openSimpleMetaData(hook);
288+
try (FDBRecordContext context = openContext()) {
289+
for (IndexingHeartbeat heartbeat: heartbeats) {
290+
heartbeat.checkAndUpdateHeartbeat(recordStore, index).join();
291+
}
292+
context.commit();
293+
}
294+
295+
// Check count, clear all
296+
openSimpleMetaData(hook);
297+
try (FDBRecordContext context = openContext()) {
298+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
299+
Assertions.assertThat(existingHeartbeats).hasSize(count);
300+
301+
for (IndexingHeartbeat heartbeat: heartbeats) {
302+
heartbeat.clearHeartbeat(recordStore, index);
303+
}
304+
context.commit();
305+
}
306+
307+
// verify cleared
308+
openSimpleMetaData(hook);
309+
try (FDBRecordContext context = openContext()) {
310+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
311+
Assertions.assertThat(existingHeartbeats).isEmpty();
312+
context.commit();
313+
}
314+
}
315+
316+
@Test
317+
void testExpiredHeartbeat() throws InterruptedException {
318+
Index index = new Index("versionIndex1", concat(field("num_value_2"), VersionKeyExpression.VERSION), IndexTypes.VERSION);
319+
FDBRecordStoreTestBase.RecordMetaDataHook hook = allIndexesHook(List.of(index));
320+
IndexingHeartbeat heartbeat1 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, TimeUnit.SECONDS.toMillis(10));
321+
322+
// Successfully update heartbeat1
323+
openSimpleMetaData(hook);
324+
try (FDBRecordContext context = openContext()) {
325+
heartbeat1.checkAndUpdateHeartbeat(recordStore, index).join();
326+
context.commit();
327+
}
328+
329+
// Delay 20, set heartbeat2's lease to 4
330+
Thread.sleep(20);
331+
IndexingHeartbeat heartbeat2 = new IndexingHeartbeat(UUID.randomUUID(), IndexBuildProto.IndexBuildIndexingStamp.Method.BY_RECORDS, 4);
332+
Assertions.assertThat(heartbeat1.indexerId).isNotEqualTo(heartbeat2.indexerId);
333+
334+
// heartbeat2 successfully takes over
335+
openSimpleMetaData(hook);
336+
try (FDBRecordContext context = openContext()) {
337+
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> existingHeartbeats = IndexingHeartbeat.getIndexingHeartbeats(recordStore, index, 0).join();
338+
Assertions.assertThat(existingHeartbeats).hasSize(1);
339+
heartbeat2.checkAndUpdateHeartbeat(recordStore, index).join();
340+
context.commit();
341+
}
342+
}
343+
}

0 commit comments

Comments
 (0)