Skip to content

Commit b74cbc2

Browse files
authored
Add support for record validation (#3314)
This PR adds two new facilities in support of record validation: - Scan record keys in a store - Validate record (no repair yet) Record validation is meant to detect cases of missing KVs in the database and (optionally) repair them. The combination of the two facilities mentioned above would allow for the scanning of a store that may have suffered data missing and repair records such that the store is in consistent state. Constraints of the solution: - Missing KV only (no mis-ordered splits or corrupt keys) - Format versions of 3 and higher (no upgrades of data from one format to another tested) - With and without split long record option - With and without inline versions Resolves #3313
1 parent 7518da9 commit b74cbc2

20 files changed

+1923
-4
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* DedupCursor.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record.cursors;
22+
23+
import com.apple.foundationdb.annotation.API;
24+
import com.apple.foundationdb.async.AsyncUtil;
25+
import com.apple.foundationdb.record.RecordCoreException;
26+
import com.apple.foundationdb.record.RecordCursor;
27+
import com.apple.foundationdb.record.RecordCursorContinuation;
28+
import com.apple.foundationdb.record.RecordCursorProto;
29+
import com.apple.foundationdb.record.RecordCursorResult;
30+
import com.apple.foundationdb.record.RecordCursorVisitor;
31+
import com.apple.foundationdb.tuple.ByteArrayUtil2;
32+
import com.google.protobuf.ByteString;
33+
import com.google.protobuf.InvalidProtocolBufferException;
34+
35+
import javax.annotation.Nonnull;
36+
import javax.annotation.Nullable;
37+
import java.util.Objects;
38+
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.Executor;
40+
import java.util.concurrent.atomic.AtomicReference;
41+
import java.util.function.Function;
42+
43+
/**
44+
* A cursor that deduplicates adjacent elements in the input and only returns unique values.
45+
* An example usage for this kind of cursor is the iteration of KV pairs, returning the primary keys of the records. Since
46+
* there can be multiple Kvs per record, we need to filter out the redundancies.
47+
* This cursor takes in an <i>inner</i> cursor factory function and a pair of pack/unpack functions. Those are needed as
48+
* part of the continuation management, as the overall continuation for the cursor needs both the inner cursor continuation
49+
* and the last found value (so that can can compare it at the beginning of the next iteration).
50+
* <p>
51+
* This cursor also assumes that there is always some forward progress made in each iteration (unless the inner record
52+
* is exhausted), so that we will get some inner result during every iteration to feed into the continuation.
53+
* <p>
54+
* The cursor assumes that the inner cursor is sorted (the assumption is actually somewhat weaker: that the repeated elements are grouped)
55+
* such that all the elements of a certain repeated value appear in sequence, hence it can remove all but the first, and
56+
* the stored state can be kept to a minimum.
57+
*
58+
* @param <T> the type of elements of the cursor
59+
*/
60+
@API(API.Status.EXPERIMENTAL)
61+
public class DedupCursor<T> implements RecordCursor<T> {
62+
/** Inner cursor. */
63+
@Nonnull
64+
private final RecordCursor<T> inner;
65+
/** The result returned from the cursor. */
66+
@Nullable
67+
private RecordCursorResult<T> nextResult;
68+
/** The last value found (to be compared against the next value generated). */
69+
@Nullable
70+
private T lastValue;
71+
/** The method that can pack a value into a byte[]. */
72+
@Nonnull
73+
private final Function<T, byte[]> packValue;
74+
75+
/**
76+
* Constructor.
77+
* @param innerCursorFactory factory method to create an inner cursor given a continuation
78+
* @param unpackValue a method that can unpack a value from byte array (used to deserialize from a continuation)
79+
* @param packValue a method that can pack a value into a byte array (used to serialize to a continuation). Note that
80+
* this will only be called if the value is NOT null. Null value will be converted internally to null byte[].
81+
* {@code pack} itself should never return {@code null}.
82+
* @param continuation the cursor continuation (null if none)
83+
*/
84+
@API(API.Status.EXPERIMENTAL)
85+
public DedupCursor(@Nonnull Function<byte[], RecordCursor<T>> innerCursorFactory,
86+
@Nonnull Function<byte[], T> unpackValue,
87+
@Nonnull Function<T, byte[]> packValue,
88+
@Nullable byte[] continuation) {
89+
this.packValue = packValue;
90+
91+
byte[] innerContinuation = null;
92+
if (continuation != null) {
93+
try {
94+
RecordCursorProto.DedupContinuation dedupContinuation = RecordCursorProto.DedupContinuation.parseFrom(continuation);
95+
// the inner is required in the continuation
96+
innerContinuation = dedupContinuation.getInnerContinuation().toByteArray();
97+
if (dedupContinuation.hasLastValue()) {
98+
lastValue = unpackValue.apply(dedupContinuation.getLastValue().toByteArray());
99+
}
100+
} catch (InvalidProtocolBufferException ex) {
101+
throw new RecordCoreException("Error parsing continuation", ex)
102+
.addLogInfo("raw_bytes", ByteArrayUtil2.loggable(continuation));
103+
}
104+
}
105+
inner = innerCursorFactory.apply(innerContinuation);
106+
}
107+
108+
@Nonnull
109+
@Override
110+
public CompletableFuture<RecordCursorResult<T>> onNext() {
111+
if (nextResult != null && !nextResult.hasNext()) {
112+
return CompletableFuture.completedFuture(nextResult);
113+
}
114+
115+
AtomicReference<RecordCursorResult<T>> currentResult = new AtomicReference<>();
116+
return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> {
117+
currentResult.set(innerResult);
118+
final boolean hasNext = innerResult.hasNext();
119+
// keep looping if we have more records and value is the same as before
120+
return hasNext && Objects.equals(innerResult.get(), lastValue);
121+
}), getExecutor()).thenApply(vignore -> applyResult(currentResult.get()));
122+
}
123+
124+
@Nullable
125+
private RecordCursorResult<T> applyResult(final RecordCursorResult<T> currentResult) {
126+
if (currentResult.hasNext()) {
127+
lastValue = currentResult.get();
128+
nextResult = RecordCursorResult.withNextValue(lastValue,
129+
new DedupCursorContinuation(currentResult.getContinuation(), lastValue));
130+
} else {
131+
if (currentResult.getNoNextReason().isSourceExhausted()) {
132+
nextResult = RecordCursorResult.exhausted(); //continuation not valid here
133+
} else {
134+
nextResult = RecordCursorResult.withoutNextValue(
135+
new DedupCursorContinuation(currentResult.getContinuation(), lastValue),
136+
currentResult.getNoNextReason());
137+
}
138+
}
139+
return nextResult;
140+
}
141+
142+
@Override
143+
public void close() {
144+
inner.close();
145+
}
146+
147+
@Override
148+
public boolean isClosed() {
149+
return inner.isClosed();
150+
}
151+
152+
@Nonnull
153+
@Override
154+
public Executor getExecutor() {
155+
return inner.getExecutor();
156+
}
157+
158+
@Override
159+
public boolean accept(@Nonnull RecordCursorVisitor visitor) {
160+
if (visitor.visitEnter(this)) {
161+
inner.accept(visitor);
162+
}
163+
return visitor.visitLeave(this);
164+
}
165+
166+
/**
167+
* Form a continuation that allows us to restart with the current cursor at its next position.
168+
* The continuation holds on to the state of the internal cursor as well as the last item seen (if any).
169+
*/
170+
private class DedupCursorContinuation implements RecordCursorContinuation {
171+
@Nonnull
172+
private final RecordCursorContinuation innerContinuation;
173+
@Nullable
174+
private final T lastValue;
175+
private byte[] cachedBytes;
176+
177+
private DedupCursorContinuation(@Nonnull RecordCursorContinuation innerContinuation, @Nullable T lastValue) {
178+
this.innerContinuation = innerContinuation;
179+
this.lastValue = lastValue;
180+
}
181+
182+
@Nullable
183+
@Override
184+
public byte[] toBytes() {
185+
if (isEnd()) {
186+
return null;
187+
} else {
188+
//form bytes exactly once
189+
if (cachedBytes == null) {
190+
byte[] lastValuePacked = pack(lastValue);
191+
final RecordCursorProto.DedupContinuation.Builder builder = RecordCursorProto.DedupContinuation.newBuilder()
192+
.setInnerContinuation(innerContinuation.toByteString());
193+
if (lastValuePacked != null) {
194+
builder.setLastValue(ByteString.copyFrom(lastValuePacked));
195+
}
196+
cachedBytes = builder.build().toByteArray();
197+
}
198+
return cachedBytes;
199+
}
200+
}
201+
202+
@Override
203+
public boolean isEnd() {
204+
return innerContinuation.isEnd();
205+
}
206+
207+
private byte[] pack(final T value) {
208+
return (value == null) ? null : packValue.apply(value);
209+
}
210+
}
211+
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public enum LogMessageKeys {
6666
FOUND_INDEX,
6767
KNOWN_LAST_KEY,
6868
SPLIT_NEXT_INDEX("next_index"),
69+
SPLIT_EXPECTED,
70+
SPLIT_FOUND,
6971
SPLIT_REVERSE("reverse"),
7072
READ_LAST_KEY_MICROS,
7173
// protobuf parsing

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import com.apple.foundationdb.record.ScanProperties;
6363
import com.apple.foundationdb.record.TupleRange;
6464
import com.apple.foundationdb.record.cursors.CursorLimitManager;
65+
import com.apple.foundationdb.record.cursors.DedupCursor;
6566
import com.apple.foundationdb.record.cursors.ListCursor;
6667
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
6768
import com.apple.foundationdb.record.logging.LogMessageKeys;
@@ -1100,7 +1101,7 @@ private <M extends Message> CompletableFuture<FDBStoredRecord<M>> deserializeRec
11001101
return CompletableFuture.completedFuture(recordBuilder.build());
11011102
}
11021103
} catch (Exception ex) {
1103-
final LoggableException ex2 = new RecordCoreException("Failed to deserialize record", ex);
1104+
final LoggableException ex2 = new RecordDeserializationException("Failed to deserialize record", ex);
11041105
ex2.addLogInfo(
11051106
subspaceProvider.logKey(), subspaceProvider.toString(context),
11061107
LogMessageKeys.PRIMARY_KEY, primaryKey,
@@ -1218,6 +1219,53 @@ public RecordCursor<FDBStoredRecord<Message>> scanRecords(@Nullable final Tuple
12181219
return scanTypedRecords(serializer, low, high, lowEndpoint, highEndpoint, continuation, scanProperties);
12191220
}
12201221

1222+
@Nonnull
1223+
@Override
1224+
@SuppressWarnings("PMD.CloseResource")
1225+
public RecordCursor<Tuple> scanRecordKeys(@Nullable final byte[] continuation, @Nonnull final ScanProperties scanProperties) {
1226+
if ( ! getFormatVersionEnum().isAtLeast(FormatVersion.RECORD_COUNT_KEY_ADDED)) {
1227+
// This is only tested for version >= 3
1228+
throw new UnsupportedFormatVersionException("scanRecordKeys does not support this format version");
1229+
}
1230+
final RecordMetaData metaData = metaDataProvider.getRecordMetaData();
1231+
final Subspace recordsSubspace = recordsSubspace();
1232+
if (!metaData.isSplitLongRecords() && omitUnsplitRecordSuffix) {
1233+
// In this case there are only "simple" records (single split with no version). Return then directly.
1234+
KeyValueCursor keyValuesCursor = KeyValueCursor.Builder
1235+
.withSubspace(recordsSubspace)
1236+
.setContext(context)
1237+
.setContinuation(continuation)
1238+
.setRange(TupleRange.ALL)
1239+
.setScanProperties(scanProperties)
1240+
.build();
1241+
return keyValuesCursor
1242+
.map(kv -> SplitHelper.unpackKey(recordsSubspace, kv));
1243+
} else {
1244+
Function<byte[], RecordCursor<Tuple>> innerFunction = cont -> {
1245+
// Create the inner cursor
1246+
final KeyValueCursor cursor = KeyValueCursor.Builder
1247+
.withSubspace(recordsSubspace)
1248+
.setContext(context)
1249+
.setContinuation(cont)
1250+
.setRange(TupleRange.ALL)
1251+
// inner cursor should not have return row limit
1252+
.setScanProperties(scanProperties.with(ExecuteProperties::clearReturnedRowLimit))
1253+
.build();
1254+
// map the KV to the primary key
1255+
return cursor.map(kv -> {
1256+
final Tuple keyTuple = recordsSubspace.unpack(kv.getKey());
1257+
Tuple nextKey = keyTuple.popBack(); // Remove index item
1258+
SplitHelper.validatePrimaryKeySuffixNumber(keyTuple);
1259+
return nextKey;
1260+
});
1261+
};
1262+
1263+
return new DedupCursor<>(innerFunction, Tuple::fromBytes, Tuple::pack, continuation)
1264+
// Apply row limit to the outer cursor
1265+
.limitRowsTo(scanProperties.getExecuteProperties().getReturnedRowLimit());
1266+
}
1267+
}
1268+
12211269
@Nonnull
12221270
@SuppressWarnings("PMD.CloseResource")
12231271
public <M extends Message> RecordCursor<FDBStoredRecord<M>> scanTypedRecords(@Nonnull RecordSerializer<M> typedSerializer,
@@ -5474,4 +5522,5 @@ private CompletableFuture<Void> preloadMetaData() {
54745522
}
54755523
}
54765524
}
5525+
54775526
}

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStoreBase.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,27 @@ RecordCursor<FDBStoredRecord<M>> scanRecords(@Nullable Tuple low, @Nullable Tupl
884884
@Nullable byte[] continuation,
885885
@Nonnull ScanProperties scanProperties);
886886

887+
/**
888+
* Scan a range and return the record primary keys within it.
889+
* This method will return a cursor that iterates through all the known primary keys in a range regardless of whether
890+
* they consist of a valid record or not. The cursor can be used to find all candidate record primary keys that can
891+
* be (for example) scanned for validation.
892+
* The cursor attempts to account for format versions, split records and inline versions, while
893+
* assuming that there may be inconsistencies in the data (The inconsistencies as of right now are missing key-value
894+
* pairs)
895+
*
896+
* @param continuation a continuation from a previous scan (null if none)
897+
* @param scanProperties the scan properties to use (reverse is not supported)
898+
*
899+
* @return a cursor of Tuples representing the Pks of the records in the range
900+
*/
901+
@Nonnull
902+
@API(API.Status.INTERNAL)
903+
default RecordCursor<Tuple> scanRecordKeys(@Nullable byte[] continuation, @Nonnull ScanProperties scanProperties) {
904+
// for backwards compatibility, allowing implementers to support later
905+
throw new UnsupportedOperationException("scanRecordKeys should be implemented");
906+
}
907+
887908
/**
888909
* Count the number of records in the database in a range.
889910
*

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBTypedRecordStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,12 @@ public RecordCursor<FDBStoredRecord<M>> scanRecords(@Nullable Tuple low, @Nullab
183183
return untypedStore.scanTypedRecords(typedSerializer, low, high, lowEndpoint, highEndpoint, continuation, scanProperties);
184184
}
185185

186+
@Nonnull
187+
@Override
188+
public RecordCursor<Tuple> scanRecordKeys(@Nullable final byte[] continuation, @Nonnull final ScanProperties scanProperties) {
189+
return untypedStore.scanRecordKeys(continuation, scanProperties);
190+
}
191+
186192
@Nonnull
187193
@Override
188194
public CompletableFuture<Integer> countRecords(@Nullable Tuple low, @Nullable Tuple high, @Nonnull EndpointType lowEndpoint, @Nonnull EndpointType highEndpoint, @Nullable byte[] continuation, @Nonnull ScanProperties scanProperties) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* RecordDeserializationException.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record.provider.foundationdb;
22+
23+
import com.apple.foundationdb.record.RecordCoreStorageException;
24+
25+
/**
26+
* Exception thrown when a record deserialization fails.
27+
*/
28+
@SuppressWarnings({"serial"})
29+
public class RecordDeserializationException extends RecordCoreStorageException {
30+
public RecordDeserializationException(final String message, final Throwable cause) {
31+
super(message, cause);
32+
}
33+
}

0 commit comments

Comments
 (0)