diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java index 5153b00f09..88f4af97d1 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java @@ -138,6 +138,7 @@ public enum LogMessageKeys { END_TUPLE, REAL_END, RECORDS_SCANNED, + RECORDS_DELETED, ORIGINAL_RANGE, SPLIT_RANGES, REASON, @@ -312,6 +313,7 @@ public enum LogMessageKeys { TOTAL_RECORDS_SCANNED, TOTAL_RECORDS_SCANNED_DURING_FAILURES, SCRUB_TYPE, + RETRY_COUNT, // time limits milliseconds TIME_LIMIT_MILLIS("time_limit_milliseconds"), diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBDatabase.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBDatabase.java index af9f4b45da..1e21000e9b 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBDatabase.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBDatabase.java @@ -740,7 +740,13 @@ public Executor getExecutor() { return factory.getExecutor(); } - protected Executor newContextExecutor(@Nullable Map mdcContext) { + /** + * Create a new executor for the database. This is used internally when creating a transaction or a new runner. + * @param mdcContext if present, the MDC context to be made available within the executors threads + * @return the new executor + */ + @API(API.Status.INTERNAL) + public Executor newContextExecutor(@Nullable Map mdcContext) { return factory.newContextExecutor(mdcContext); } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/CursorFactory.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/CursorFactory.java new file mode 100644 index 0000000000..f4a084276d --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/CursorFactory.java @@ -0,0 +1,49 @@ +/* + * CursorFactory.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.cursors.throttled; + +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.record.RecordCursor; +import com.apple.foundationdb.record.RecordCursorResult; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Create a cursor with the given store and last result. + * @param the type of item the cursor iterates over. + * This factory method is used by the {@link ThrottledRetryingIterator} to create inner cursors when needed. + * The iterator creates transactions based off of the constraints given, and for each such transaction, a new inner + * cursor gets created. + */ +@API(API.Status.EXPERIMENTAL) +@FunctionalInterface +public interface CursorFactory { + /** + * Create a new inner cursor for the {@link ThrottledRetryingIterator}. + * @param store the record store to use + * @param lastResult the last result processed by the previous cursor (use for continuation). Null is none. + * @param rowLimit the adjusted row limit to use + * @return a newly created cursor with the given continuation and limit + */ + RecordCursor createCursor(@Nonnull FDBRecordStore store, @Nullable RecordCursorResult lastResult, int rowLimit); +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ItemHandler.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ItemHandler.java new file mode 100644 index 0000000000..2a4f42b418 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ItemHandler.java @@ -0,0 +1,51 @@ +/* + * ItemHandler.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.cursors.throttled; + +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.record.RecordCursorResult; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; + +import javax.annotation.Nonnull; +import java.util.concurrent.CompletableFuture; + +/** + * A handler of an item during the iteration of a {@link ThrottledRetryingIterator}. + * @param the type of element in the iteration + */ +@API(API.Status.EXPERIMENTAL) +@FunctionalInterface +public interface ItemHandler { + /** + * Process an item. + * Once done processing, return a future that controls whether to continue the iteration or stop. + * The quota manager holds the current state of the iteration (per the current transaction). The handler can + * change the state via {@link ThrottledRetryingIterator.QuotaManager#deleteCountAdd(int)}, + * {@link ThrottledRetryingIterator.QuotaManager#deleteCountInc()} and + * {@link ThrottledRetryingIterator.QuotaManager#markExhausted()}. + * @param store the record store to use + * @param lastResult the result to process + * @param quotaManager the current quota manager state + * @return Future (Void) for when the operation is complete + */ + @Nonnull + CompletableFuture handleOneItem(FDBRecordStore store, RecordCursorResult lastResult, ThrottledRetryingIterator.QuotaManager quotaManager); +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ThrottledRetryingIterator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ThrottledRetryingIterator.java new file mode 100644 index 0000000000..037259e6cd --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ThrottledRetryingIterator.java @@ -0,0 +1,551 @@ +/* + * ThrottledRetryingIterator.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.cursors.throttled; + +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.async.MoreAsyncUtil; +import com.apple.foundationdb.record.RecordCursor; +import com.apple.foundationdb.record.RecordCursorResult; +import com.apple.foundationdb.record.logging.KeyValueLogMessage; +import com.apple.foundationdb.record.logging.LogMessageKeys; +import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase; +import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContextConfig; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.runners.FutureAutoClose; +import com.apple.foundationdb.record.provider.foundationdb.runners.TransactionalRunner; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * An iterator that can handle resource constraints and failures. + * This class iterates over an inner cursor, applying resource controls (# of ops per transaction and per time), and + * retrying failed operations. The iterator will build its own transactions and stores so that it can handle long-running + * operations. + *

+ * The iterator currently controls Read and Delete operations . If any other use case is required, it can + * be extended by adding additional limits per transaction/second. + * + * @param The iterated item type + */ +@API(API.Status.EXPERIMENTAL) +public class ThrottledRetryingIterator implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(ThrottledRetryingIterator.class); + + public static final int NUMBER_OF_RETRIES = 100; + private static final int SUCCESS_INCREASE_THRESHOLD = 40; + + @Nonnull + private final TransactionalRunner transactionalRunner; + @Nonnull + private final Executor executor; + @Nonnull + private final ScheduledExecutorService scheduledExecutor; + @Nonnull + private final FutureAutoClose futureManager; + + private final int transactionTimeQuotaMillis; + private final int maxRecordDeletesPerTransaction; + private final int maxRecordScannedPerSec; + private final int maxRecordDeletesPerSec; + @Nonnull + private final CursorFactory cursorCreator; + @Nonnull + private final ItemHandler singleItemHandler; + @Nullable + private final Consumer transactionSuccessNotification; + @Nullable + private final Consumer transactionInitNotification; + private final int numOfRetries; + + private boolean closed = false; + /** Starting time of the current/most-recent transaction. */ + private long rangeIterationStartTimeMilliseconds = 0; + /** Cursor limit in a single transaction (throttled). */ + private int cursorRowsLimit; + /** reset at each success. */ + private int failureRetriesCounter = 0; + /** reset on each failure. */ + private int successCounter = 0; + + public ThrottledRetryingIterator(Builder builder) { + this.transactionalRunner = builder.transactionalRunner; + this.executor = builder.executor; + this.scheduledExecutor = builder.scheduledExecutor; + this.cursorCreator = builder.cursorCreator; + this.singleItemHandler = builder.singleItemHandler; + this.transactionTimeQuotaMillis = builder.transactionTimeQuotaMillis; + this.maxRecordDeletesPerTransaction = builder.maxRecordDeletesPerTransaction; + this.maxRecordScannedPerSec = builder.maxRecordScannedPerSec; + this.maxRecordDeletesPerSec = builder.maxRecordDeletesPerSec; + this.transactionSuccessNotification = builder.transactionSuccessNotification; + this.transactionInitNotification = builder.transactionInitNotification; + this.cursorRowsLimit = 0; + this.numOfRetries = builder.numOfRetries; + futureManager = new FutureAutoClose(); + } + + /** + * Iterate over the inner cursor. + *

+ * This is the main entry point for the class: This method would return a future that, when complete normally, signifies the + * completion of the iteration over the inner cursor. The iteration will create its own transactions for the actual + * data access, and so this can be done outside the scope of a transaction. + * @param storeBuilder the store builder to use for the iteration + * @return a future that, when complete normally, means the iteration is complete + */ + public CompletableFuture iterateAll(final FDBRecordStore.Builder storeBuilder) { + final AtomicReference> lastSuccessCont = new AtomicReference<>(null); + final QuotaManager singleIterationQuotaManager = new QuotaManager(); + return AsyncUtil.whileTrue(() -> + // iterate ranges + iterateOneRange(storeBuilder, lastSuccessCont.get(), singleIterationQuotaManager) + .handle((continuation, ex) -> { + if (ex == null) { + lastSuccessCont.set(continuation); + return handleSuccess(singleIterationQuotaManager); + } + return handleFailure(ex, singleIterationQuotaManager); + }) + .thenCompose(ret -> ret), + executor); + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + // Ensure we call both close() methods, capturing all exceptions + RuntimeException caught = null; + try { + futureManager.close(); + } catch (RuntimeException e) { + caught = e; + } + try { + transactionalRunner.close(); + } catch (RuntimeException e) { + if (caught != null) { + caught.addSuppressed(e); + } else { + caught = e; + } + } + if (caught != null) { + throw caught; + } + } + + /** + * Run a single transaction. + * Start a transaction and iterate until done: Either source exhausted, error occurred or constraint reached. + * + * @param userStoreBuilder store builder to create new stores + * @param cursorStartPoint the last result (from which continuation can be extracted) + * @param singleIterationQuotaManager instance of quote manager to use + * @return a future of the last cursor result obtained + */ + private CompletableFuture> iterateOneRange(FDBRecordStore.Builder userStoreBuilder, + RecordCursorResult cursorStartPoint, + QuotaManager singleIterationQuotaManager) { + AtomicReference> cont = new AtomicReference<>(); + + return transactionalRunner.runAsync(true, transaction -> { + // this layer returns last cursor result + singleIterationQuotaManager.init(); + + runUnlessNull(transactionInitNotification, singleIterationQuotaManager); // let the user know about this range iteration attempt + final FDBRecordStore store = userStoreBuilder.setContext(transaction).build(); + RecordCursor cursor = cursorCreator.createCursor(store, cursorStartPoint, cursorRowsLimit); + rangeIterationStartTimeMilliseconds = nowMillis(); + + return AsyncUtil.whileTrue(() -> cursor.onNext() + .thenCompose(result -> { + cont.set(result); + if (!result.hasNext()) { + if (result.getNoNextReason().isSourceExhausted()) { + // terminate the iteration + singleIterationQuotaManager.hasMore = false; + } + // end of this one range + return AsyncUtil.READY_FALSE; + } + singleIterationQuotaManager.scannedCount++; + CompletableFuture future = singleItemHandler.handleOneItem(store, result, singleIterationQuotaManager); + // Register the externally-provided future so that it is closed if the runner is closed before it completes + return futureManager.registerFuture(future) + .thenApply(ignore -> singleIterationQuotaManager.hasMore); + }) + .thenApply(rangeHasMore -> { + if (rangeHasMore && ((0 < transactionTimeQuotaMillis && elapsedTimeMillis() > transactionTimeQuotaMillis) || + (0 < maxRecordDeletesPerTransaction && singleIterationQuotaManager.deletesCount >= maxRecordDeletesPerTransaction))) { + // Reached time/delete quota in this transaction. Continue in a new one (possibly after throttling) + return false; + } + return rangeHasMore; + }), + executor) + .whenComplete((r, e) -> cursor.close()); + }).thenApply(ignore -> cont.get()); + } + + private CompletableFuture handleSuccess(QuotaManager quotaManager) { + runUnlessNull(transactionSuccessNotification, quotaManager); // let the user know about this successful range iteration + + if (!quotaManager.hasMore) { + // Here: all done, no need for throttling + return AsyncUtil.READY_FALSE; + } + + // Maybe increase cursor's row limit + ++successCounter; + if (((successCounter) % SUCCESS_INCREASE_THRESHOLD) == 0 && cursorRowsLimit < (quotaManager.scannedCount + 3)) { + final int oldLimit = cursorRowsLimit; + cursorRowsLimit = increaseLimit(oldLimit); + if (logger.isInfoEnabled() && (oldLimit != cursorRowsLimit)) { + logger.info(KeyValueLogMessage.of("ThrottledIterator: iterate one range success: increase limit", + LogMessageKeys.LIMIT, cursorRowsLimit, + LogMessageKeys.OLD_LIMIT, oldLimit, + LogMessageKeys.SUCCESSFUL_TRANSACTIONS_COUNT, successCounter)); + } + } + failureRetriesCounter = 0; + + // Here: calculate delay + long rangeProcessingTimeMillis = Math.max(0, elapsedTimeMillis()); + long toWaitMillis = Collections.max(List.of( + // delay required for max deletes per second throttling + throttlePerSecGetDelayMillis(rangeProcessingTimeMillis, maxRecordDeletesPerSec, quotaManager.deletesCount), + // delay required for max records scanned per second throttling + throttlePerSecGetDelayMillis(rangeProcessingTimeMillis, maxRecordScannedPerSec, quotaManager.scannedCount) + )); + + if (toWaitMillis > 0) { + // Schedule another transaction according to max number per seconds + final CompletableFuture result = MoreAsyncUtil.delayedFuture(toWaitMillis, TimeUnit.MILLISECONDS, scheduledExecutor); + // Register the externally-provided future with the manager so that it is closed once the runner is closed + return futureManager.registerFuture(result).thenApply(ignore -> true); + } else { + return AsyncUtil.READY_TRUE; + } + } + + private CompletableFuture handleFailure(Throwable ex, QuotaManager quotaManager) { + // Note: the transactional runner does not retry internally + ++failureRetriesCounter; + if (failureRetriesCounter > numOfRetries) { + if (logger.isWarnEnabled()) { + logger.warn(KeyValueLogMessage.of("ThrottledIterator: iterate one range failure: will abort", + LogMessageKeys.LIMIT, cursorRowsLimit, + LogMessageKeys.RETRY_COUNT, failureRetriesCounter), + ex); + } + // Complete exceptionally + return CompletableFuture.failedFuture(ex); + } + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + if (ex instanceof FDBDatabaseRunner.RunnerClosed) { + if (logger.isWarnEnabled()) { + logger.warn(KeyValueLogMessage.of("ThrottledIterator: runner closed: will abort"), ex); + } + // Complete exceptionally, do not retry + return CompletableFuture.failedFuture(ex); + } + + // Here: after a failure, try setting a scan quota that is smaller than the number of scanned items during the failure + successCounter = 0; + final int oldLimit = cursorRowsLimit; + cursorRowsLimit = decreaseLimit(quotaManager.scannedCount); + if (logger.isInfoEnabled() && (oldLimit != cursorRowsLimit)) { + logger.info(KeyValueLogMessage.of("ThrottledIterator: iterate one range failure: will retry", + LogMessageKeys.LIMIT, cursorRowsLimit, + LogMessageKeys.OLD_LIMIT, oldLimit, + LogMessageKeys.RETRY_COUNT, failureRetriesCounter), + ex); + } + + return AsyncUtil.READY_TRUE; // retry + } + + @VisibleForTesting + static long throttlePerSecGetDelayMillis(long rangeProcessingTimeMillis, int maxPerSec, int eventsCount) { + if (maxPerSec <= 0) { + return 0; // do not throttle + } + // get the number of events, get the min time they should have taken, + // and return a padding time (if positive) + // MS(count / perSec) - ptimeMillis ==> MS(count) / perSec - ptimeMillis (avoid floating point, the floor effect is a neglectable 0.005%) + long waitMillis = (TimeUnit.SECONDS.toMillis(eventsCount) / maxPerSec) - rangeProcessingTimeMillis; + return waitMillis > 0 ? waitMillis : 0; + } + + private long nowMillis() { + return System.currentTimeMillis(); + } + + private long elapsedTimeMillis() { + return rangeIterationStartTimeMilliseconds <= 0 ? 0 : + nowMillis() - rangeIterationStartTimeMilliseconds; + } + + private static void runUnlessNull(@Nullable Consumer func, QuotaManager quotaManager) { + if (func != null) { + func.accept(quotaManager); + } + } + + @VisibleForTesting + static int increaseLimit(final int current) { + if (current == 0) { + return 0; + } + return (Math.max((current * 5) / 4, current + 4)); + } + + @VisibleForTesting + static int decreaseLimit(final int lastScanned) { + return Math.max(1, (lastScanned * 9) / 10); + } + + /** + * A class that manages the resource constraints of the iterator. + * This class is used by the iterator and is also given to the callbacks. It reflects the current state of the controlled + * constraints and helps determine whether a transaction should be committed and another started. + * The quota manger lifecycle is attached to the transaction. Once a new transaction starts, these counts get reset. + */ + public static class QuotaManager { + int deletesCount; + int scannedCount; + boolean hasMore; + + public int getDeletesCount() { + return deletesCount; + } + + public int getScannedCount() { + return scannedCount; + } + + /** + * Increment deleted item number by count. + * @param count the number of items to increment deleted count by + */ + public void deleteCountAdd(int count) { + deletesCount += count; + } + + /** + * Increment deleted item number by 1. + */ + public void deleteCountInc() { + deletesCount++; + } + + /** + * Mark this source as exhausted, This effectively stops the iteration after this item. + */ + public void markExhausted() { + hasMore = false; + } + + void init() { + deletesCount = 0; + scannedCount = 0; + hasMore = true; + } + } + + public static Builder builder(TransactionalRunner runner, + Executor executor, + ScheduledExecutorService scheduledExecutor, + CursorFactory cursorCreator, + ItemHandler singleItemHandler) { + return new Builder<>(runner, executor, scheduledExecutor, cursorCreator, singleItemHandler); + } + + public static Builder builder(FDBDatabase database, + CursorFactory cursorCreator, + ItemHandler singleItemHandler) { + return new Builder<>(database, FDBRecordContextConfig.newBuilder(), cursorCreator, singleItemHandler); + } + + /** + * A builder class for the iterator. + * + * @param the item type being iterated on. + */ + public static class Builder { + public TransactionalRunner transactionalRunner; + public Executor executor; + public ScheduledExecutorService scheduledExecutor; + private final CursorFactory cursorCreator; + private final ItemHandler singleItemHandler; + private Consumer transactionSuccessNotification; + private Consumer transactionInitNotification; + private int transactionTimeQuotaMillis; + private int maxRecordDeletesPerTransaction; + private int maxRecordScannedPerSec; + private int maxRecordDeletesPerSec; + private int numOfRetries; + + /** + * Constructor. + * @param runner the FDB runner to use when creating transactions + * @param cursorCreator the factory to use when creating the inner cursor + * @param singleItemHandler the handler of a single item while iterating + */ + private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutorService scheduledExecutor, CursorFactory cursorCreator, ItemHandler singleItemHandler) { + // Mandatory fields are set in the constructor. Everything else is optional. + this.transactionalRunner = runner; + this.executor = executor; + this.scheduledExecutor = scheduledExecutor; + this.cursorCreator = cursorCreator; + this.singleItemHandler = singleItemHandler; + // set defaults + this.transactionTimeQuotaMillis = (int)TimeUnit.SECONDS.toMillis(4); + this.maxRecordDeletesPerTransaction = 0; + this.maxRecordScannedPerSec = 0; + this.maxRecordDeletesPerSec = 0; + this.numOfRetries = NUMBER_OF_RETRIES; + } + + private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory cursorCreator, ItemHandler singleItemHandler) { + this(new TransactionalRunner(database, contextConfigBuilder), + database.newContextExecutor(contextConfigBuilder.getMdcContext()), + database.getScheduledExecutor(), + cursorCreator, + singleItemHandler); + } + + /** + * Set the amount of time for each transaction before committing and starting another. + * Defaults to 4000. + * @param transactionTimeQuotaMillis the maximum duration of a transaction. + * @return this builder + */ + public Builder withTransactionTimeQuotaMillis(int transactionTimeQuotaMillis) { + this.transactionTimeQuotaMillis = Math.max(0, transactionTimeQuotaMillis); + return this; + } + + /** + * Set the max number of records that can be scanned in a given second. + * This parameter will control the delay between transactions (not within a single transaction). Once a transaction + * has been committed, this will govern whether the iterator will delay starting the next one. + * Defaults to 0 (no limit). + * @param maxRecordsScannedPerSec the number of items scanned (on average) per second by the iterator + * @return this builder + */ + public Builder withMaxRecordsScannedPerSec(int maxRecordsScannedPerSec) { + this.maxRecordScannedPerSec = Math.max(0, maxRecordsScannedPerSec); + return this; + } + + /** + * Set the max number of records that can be deleted in a given second. + * This parameter will control the delay between transactions (not within a single transaction). Once a transaction + * has been committed, this will govern whether the iterator will delay starting the next one. + * Defaults to 0 (no limit). + * @param maxRecordsDeletesPerSec the number of items deleted (on average) per second by the iterator + * @return this builder + */ + public Builder withMaxRecordsDeletesPerSec(int maxRecordsDeletesPerSec) { + this.maxRecordDeletesPerSec = Math.max(0, maxRecordsDeletesPerSec); + return this; + } + + /** + * Set the callback to invoke on transaction commit. + * @param transactionSuccessNotification the callback invoked every time a transaction is successfully committed + * Defaults to null (no callback). + * @return this builder + */ + public Builder withTransactionSuccessNotification(Consumer transactionSuccessNotification) { + this.transactionSuccessNotification = transactionSuccessNotification; + return this; + } + + /** + * Set the callback to invoke on transaction start. + * @param transactionInitNotification the callback invoked every time a transaction is created + * Defaults to null (no callback). + * @return this builder + */ + public Builder withTransactionInitNotification(Consumer transactionInitNotification) { + this.transactionInitNotification = transactionInitNotification; + return this; + } + + /** + * Set the maximum number of items deleted within a transaction. + * Once this number has been reached the transaction will be committed and another will start. The actual number + * of deletes is determined by the {@link QuotaManager#deletesCount}, affected by the {@link #singleItemHandler} + * implementation. + * Defaults to 0 (no limit). + * @param maxRecordsDeletesPerTransaction the maximum number of items scanned in a transaction + * @return this builder + */ + public Builder withMaxRecordsDeletesPerTransaction(int maxRecordsDeletesPerTransaction) { + this.maxRecordDeletesPerTransaction = Math.max(0, maxRecordsDeletesPerTransaction); + return this; + } + + /** + * Set the number of retries after a failure. + * The iterator will retry a failed transaction for this number of times (with potentially different limits) + * before failing the iteration. + * This counter gets reset upon the next successful commit. + * Defaults to 100. + * @param numOfRetries the maximum number of retries for transaction + * @return this builder + */ + public Builder withNumOfRetries(int numOfRetries) { + this.numOfRetries = Math.max(0, numOfRetries); + return this; + } + + /** + * Create the iterator. + * @return the newly minted iterator + */ + public ThrottledRetryingIterator build() { + return new ThrottledRetryingIterator<>(this); + } + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/package-info.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/package-info.java new file mode 100644 index 0000000000..880b5ec37f --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/package-info.java @@ -0,0 +1,25 @@ +/* + * package-info.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Throttled iterator: Iterator that cam handle resource constraints and retry on failures. + * {@link com.apple.foundationdb.record.provider.foundationdb.cursors.throttled.ThrottledRetryingIterator} + */ +package com.apple.foundationdb.record.provider.foundationdb.cursors.throttled; diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairRunner.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairRunner.java new file mode 100644 index 0000000000..ca7210bfbb --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairRunner.java @@ -0,0 +1,347 @@ +/* + * RecordRepairRunner.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.recordrepair; + +import com.apple.foundationdb.annotation.API; +import com.apple.foundationdb.record.RecordCursorResult; +import com.apple.foundationdb.record.ScanProperties; +import com.apple.foundationdb.record.logging.KeyValueLogMessage; +import com.apple.foundationdb.record.logging.LogMessageKeys; +import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase; +import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.cursors.throttled.CursorFactory; +import com.apple.foundationdb.record.provider.foundationdb.cursors.throttled.ItemHandler; +import com.apple.foundationdb.record.provider.foundationdb.cursors.throttled.ThrottledRetryingIterator; +import com.apple.foundationdb.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * A class that iterates through all records in a given store and validates (and optionally repairs) them. + *

+ * When records in a store are suspected to be corrupt, this class can be used to bring the store back up to consistent state. + * The runner will relax many of the record-loading constraints to allow records to be scanned and validated. The current + * validation capabilities include detecting missing splits (payload and version) and corrupt data that results in records + * that cannot be deserialized. + *

+ * This runner is expected to run for extended perios of time, and therefore makes use of the {@link ThrottledRetryingIterator} + * to provide transaction resource management. The runner will create transactions and commit them as needed (and so does + * not have to be run from within an existing transaction). + *

+ * The runner provides two main entry points: + *

    + *
  • {@link #runValidationStats(FDBRecordStore, ValidationKind)} that iterates through the store and returns an aggregated + * count of all found issues
  • + *
  • {@link #validateAndRepairHandler(List, ValidationKind)} that iterates through the store and returns a list of all found issues
  • + *
+ * There is no significant performance difference between the two. The intent is to use the former to get a view of the store + * status and to verify that a store is fully repaired, and to use the latter to iterate through the store record, one chunk + * at a time and to perform repairs as needed. + *

+ * There are currently two kinds of validations that can be performed: + *

    + *
  • {@link ValidationKind#RECORD_VALUE} will verify that the record payload is in good shape: The data exists and + * the record can be deserialized
  • + *
  • {@link ValidationKind#RECORD_VALUE_AND_VERSION} will add to the previous validation the check that the record + * has a version present
  • + *
+ * The idea is that stores that are configured to not store version data can avoid the flurry of false positives by not + * attempting to verify version information. + *

+ * A note on repair: Repairing a corrupt data would normally mean deleting the data (without trying to update indexes). + * Repairing missing version would normally mean creating a new version for the record. + */ +@API(API.Status.EXPERIMENTAL) +public class RecordRepairRunner { + /** + * The type of validation to perform. + */ + public enum ValidationKind { RECORD_VALUE, RECORD_VALUE_AND_VERSION } + + private static final Logger logger = LoggerFactory.getLogger(RecordRepairRunner.class); + + @Nonnull + private final Builder config; + + private RecordRepairRunner(@Nonnull final Builder config) { + this.config = config; + } + + /** + * Create a builder for the runner. + * @param database + * @return the builder instance + */ + static Builder builder(FDBDatabase database) { + return new Builder(database); + } + + /** + * Run a validation of the store and return an aggregated summary of the results. + * @param recordStoreBuilder the store builder to use + * @param validationKind which validation to run + * @return an aggregated result set of all the found issues + */ + public RecordValidationStatsResult runValidationStats(FDBRecordStore.Builder recordStoreBuilder, ValidationKind validationKind) { + RecordValidationStatsResult statsResult = new RecordValidationStatsResult(); + ThrottledRetryingIterator.Builder iteratorBuilder = + ThrottledRetryingIterator.builder(config.getDatabase(), cursorFactory(), countResultsHandler(statsResult, validationKind)); + iteratorBuilder = configureThrottlingIterator(iteratorBuilder, config); + try (ThrottledRetryingIterator iterator = iteratorBuilder.build()) { + iterator.iterateAll(recordStoreBuilder).join(); + } + return statsResult; + } + + /** + * Run a validation of the store and return a list of specific issues found. + * @param recordStoreBuilder the store builder to use + * @param validationKind which validation to run + * @param allowRepair whether to allow repair on the issues found + * @return a list of issues found + */ + public List runValidationAndRepair(FDBRecordStore.Builder recordStoreBuilder, ValidationKind validationKind, boolean allowRepair) { + if (allowRepair) { + throw new UnsupportedOperationException("Repair is not yet supported"); + } + + List validationResults = new ArrayList<>(); + ThrottledRetryingIterator.Builder iteratorBuilder = + ThrottledRetryingIterator.builder(config.getDatabase(), cursorFactory(), validateAndRepairHandler(validationResults, validationKind)); + iteratorBuilder = configureThrottlingIterator(iteratorBuilder, config); + try (final ThrottledRetryingIterator iterator = iteratorBuilder.build()) { + iterator.iterateAll(recordStoreBuilder).join(); + } + return validationResults; + } + + private CursorFactory cursorFactory() { + return (@Nonnull FDBRecordStore store, @Nullable RecordCursorResult lastResult, int rowLimit) -> { + byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); // todo: can be null... + ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); + return store.scanRecordKeys(continuation, scanProperties); + }; + } + + private ItemHandler countResultsHandler(RecordValidationStatsResult statsResult, final ValidationKind validationKind) { + return (FDBRecordStore store, RecordCursorResult lastResult, ThrottledRetryingIterator.QuotaManager quotaManager) -> { + return validateInternal(lastResult, store, validationKind).thenAccept(result -> { + if (!result.isValid()) { + statsResult.increment(result.getErrorCode()); + } + }); + }; + } + + private ItemHandler validateAndRepairHandler(List results, final ValidationKind validationKind) { + return (FDBRecordStore store, RecordCursorResult primaryKey, ThrottledRetryingIterator.QuotaManager quotaManager) -> { + return validateInternal(primaryKey, store, validationKind).thenAccept(result -> { + if (!result.isValid()) { + results.add(result); + if ((config.getMaxResultsReturned() > 0) && (results.size() >= config.getMaxResultsReturned())) { + quotaManager.markExhausted(); + } + } + }); + }; + } + + private static CompletableFuture validateInternal(final RecordCursorResult primaryKey, final FDBRecordStore store, final ValidationKind validationKind) { + RecordValueValidator valueValidator = new RecordValueValidator(store); + CompletableFuture resultFuture = valueValidator.validateRecordAsync(primaryKey.get()); + if (validationKind == ValidationKind.RECORD_VALUE_AND_VERSION) { + resultFuture = resultFuture.thenCompose(valueValidation -> { + if (valueValidation.isValid()) { + RecordVersionValidator versionValidator = new RecordVersionValidator(store); + return versionValidator.validateRecordAsync(primaryKey.get()); + } else { + return CompletableFuture.completedFuture(valueValidation); + } + }); + } + return resultFuture; + } + + private ThrottledRetryingIterator.Builder configureThrottlingIterator(ThrottledRetryingIterator.Builder builder, Builder config) { + return builder + .withTransactionInitNotification(this::logStartTransaction) + .withTransactionSuccessNotification(this::logCommitTransaction) + .withTransactionTimeQuotaMillis(config.getTransactionTimeQuotaMillis()) + .withMaxRecordsDeletesPerTransaction(config.getMaxRecordDeletesPerTransaction()) + .withMaxRecordsScannedPerSec(config.getMaxRecordScannedPerSec()) + .withMaxRecordsDeletesPerSec(config.getMaxRecordDeletesPerSec()) + .withNumOfRetries(config.getNumOfRetries()); + } + + private void logStartTransaction(ThrottledRetryingIterator.QuotaManager quotaManager) { + if (logger.isInfoEnabled()) { + logger.info(KeyValueLogMessage.of("RecordRepairRunner: transaction started")); + } + } + + private void logCommitTransaction(ThrottledRetryingIterator.QuotaManager quotaManager) { + if (logger.isInfoEnabled()) { + logger.info(KeyValueLogMessage.of("RecordRepairRunner: transaction committed", + LogMessageKeys.RECORDS_SCANNED, quotaManager.getScannedCount(), + LogMessageKeys.RECORDS_DELETED, quotaManager.getDeletesCount())); + } + } + + /** + * A builder to configure and create a {@link RecordRepairRunner}. + */ + public static class Builder { + @Nonnull + private final FDBDatabase database; + private int maxResultsReturned = 10_000; + + private int transactionTimeQuotaMillis = (int)TimeUnit.SECONDS.toMillis(4); + private int maxRecordDeletesPerTransaction = 0; + private int maxRecordScannedPerSec = 0; + private int maxRecordDeletesPerSec = 0; + private int numOfRetries = 4; + + /** + * Constructor. + * @param database the FDB database to use + */ + public Builder(@Nonnull final FDBDatabase database) { + this.database = database; + } + + /** + * Finalize the build and create a runner. + * @return the newly created runner + */ + public RecordRepairRunner build() { + return new RecordRepairRunner(this); + } + + /** + * Limit the number of issues found. + * This parameter is intended to stop the iteration once a number of issues has been found, as a means of controlling + * the size of the list returned. + * @param maxResultsReturned the maximum number of issues to be returned from the {@link #runValidationAndRepair(FDBRecordStore.Builder, ValidationKind, boolean)} method. + * Default: 0 (unlimited) + * @return this builder + */ + public Builder withMaxResultsReturned(int maxResultsReturned) { + this.maxResultsReturned = maxResultsReturned; + return this; + } + + /** + * Limit the number of records deleted in a transaction. + * Records can be deleted as part of the repair process. Once this number is reached, the transaction gets committed + * and a new one is started. + * @param maxRecordDeletesPerTransaction the max number of records allowed to be deleted in a transaction. Default: 0 (unlimited) + * @return this builder + */ + public Builder withMaxRecordDeletesPerTransaction(final int maxRecordDeletesPerTransaction) { + this.maxRecordDeletesPerTransaction = maxRecordDeletesPerTransaction; + return this; + } + + /** + * Limit the amount of time a transaction can take. + * This will instruct the runner to stop a transaction once this duration has been reached. Note that each transaction + * is limited by default to 5 seconds so it cannot go beyond that. + * @param transactionTimeQuotaMillis the max number of milliseconds to spend in a transaction. Default: 0 (Unlimited) + * @return this builder + */ + public Builder withTransactionTimeQuotaMillis(final int transactionTimeQuotaMillis) { + this.transactionTimeQuotaMillis = transactionTimeQuotaMillis; + return this; + } + + /** + * Limit the number of records that can be scanned every second. + * This would delay the next transaction to ensure the limit is maintained (while each record iteration is not restricted + * by itself). + * @param maxRecordScannedPerSec the average number of records to scan in per second. Default: 0 (unlimited) + * @return this builder + */ + public Builder withMaxRecordScannedPerSec(final int maxRecordScannedPerSec) { + this.maxRecordScannedPerSec = maxRecordScannedPerSec; + return this; + } + + /** + * Limit the number of records that can be deleted every second. + * This would delay the next transaction to ensure the limit is maintained (while each record iteration is not restricted + * by itself). + * @param maxRecordDeletesPerSec the average number of records to delete in per second. Default: 0 (unlimited) + * @return this builder + */ + public Builder withMaxRecordDeletesPerSec(final int maxRecordDeletesPerSec) { + this.maxRecordDeletesPerSec = maxRecordDeletesPerSec; + return this; + } + + /** + * Control the number of retries before failure. + * The runner will retry a transaction if failed. Once the max number of retries has been reached, the operation would fail. + * @param numOfRetries the maximum number of times to retry a transaction upon failure. Default: 4 + * @return this builder + */ + public Builder withNumOfRetries(final int numOfRetries) { + this.numOfRetries = numOfRetries; + return this; + } + + @Nonnull + public FDBDatabase getDatabase() { + return database; + } + + public int getMaxResultsReturned() { + return maxResultsReturned; + } + + public int getTransactionTimeQuotaMillis() { + return transactionTimeQuotaMillis; + } + + public int getMaxRecordDeletesPerTransaction() { + return maxRecordDeletesPerTransaction; + } + + public int getMaxRecordScannedPerSec() { + return maxRecordScannedPerSec; + } + + public int getMaxRecordDeletesPerSec() { + return maxRecordDeletesPerSec; + } + + public int getNumOfRetries() { + return numOfRetries; + } + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidationStatsResult.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidationStatsResult.java new file mode 100644 index 0000000000..4fd43d1fdc --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidationStatsResult.java @@ -0,0 +1,45 @@ +/* + * RecordValidationStatsResult.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.recordrepair; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class RecordValidationStatsResult { + @Nonnull + private Map stats; + + public RecordValidationStatsResult() { + this.stats = new HashMap<>(); + } + + void increment(String code) { + stats.computeIfAbsent(code, ignore -> new AtomicInteger(0)).incrementAndGet(); + } + + @Nonnull + public Map getStats() { + return stats.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get())); + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidator.java index 68e9ed661d..c7ace97c93 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidator.java @@ -49,7 +49,7 @@ * Do not store validations results across transactions. * */ -@API(API.Status.EXPERIMENTAL) +@API(API.Status.INTERNAL) public interface RecordValidator { /** * Validate a record with the given primary key. diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValueValidator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValueValidator.java index 51d10a43fe..6d94166e9a 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValueValidator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValueValidator.java @@ -40,7 +40,7 @@ * A record that is valid according to this validator has a split set that is legal (either 0 or 1..n) - or is not split - * and a payload that can be serialized with the store's schema. */ -@API(API.Status.EXPERIMENTAL) +@API(API.Status.INTERNAL) public class RecordValueValidator implements RecordValidator { public static final String CODE_SPLIT_ERROR = "SplitError"; public static final String CODE_DESERIALIZE_ERROR = "DeserializeError"; diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordVersionValidator.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordVersionValidator.java index 0a3f0a75c0..292005d23f 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordVersionValidator.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordVersionValidator.java @@ -31,7 +31,7 @@ * A record validator that ensures the record has a valid version. * A record that is valid with this validator has to have a valid value, has to exist and has to have a version. */ -@API(API.Status.EXPERIMENTAL) +@API(API.Status.INTERNAL) public class RecordVersionValidator implements RecordValidator { public static final String CODE_VERSION_MISSING_ERROR = "VersionMissingError"; public static final String CODE_RECORD_MISSING_ERROR = "RecordMissingError"; diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ThrottledIteratorTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ThrottledIteratorTest.java new file mode 100644 index 0000000000..69ef56f5a7 --- /dev/null +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/cursors/throttled/ThrottledIteratorTest.java @@ -0,0 +1,622 @@ +/* + * ThrottledIteratorTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.cursors.throttled; + +import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.async.MoreAsyncUtil; +import com.apple.foundationdb.record.RecordCursor; +import com.apple.foundationdb.record.ScanProperties; +import com.apple.foundationdb.record.TestRecords1Proto; +import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; +import com.apple.foundationdb.tuple.Tuple; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class ThrottledIteratorTest extends FDBRecordStoreTestBase { + @ParameterizedTest + @CsvSource({ + "1000,100,0,0", // less than max + "2000,100,180,0", + "100,10,1,0", + "105,10,1,0", + + "1000,100,100,0", // just right + "100,10,1,0", + + "1000,100,200,1000", // delay required - the more interesting cases... + "2000,100,210,100", + "250,100,100,750", + "250,50,100,1750", // 100 events should take two seconds, wait what it takes to get there + "1,50,100,1999", + "1999,50,100,1", + "10,10,1,90", // 10 events per second, require 100ms per one event + + "500,100,49,0", // consecutive + "500,100,50,0", + "500,100,51,10", + + }) + void testThrottledIteratorGetDelay(long rangeProcessingTimeMillis, int maxPerSec, int eventsCount, long expectedResult) { + long ret = ThrottledRetryingIterator.throttlePerSecGetDelayMillis(rangeProcessingTimeMillis, maxPerSec, eventsCount); + assertThat(ret).isEqualTo(expectedResult); + } + + @Test + void testIncreaseLimit() { + assertThat(ThrottledRetryingIterator.increaseLimit(0)).isEqualTo(0); + assertThat(ThrottledRetryingIterator.increaseLimit(100)).isEqualTo(125); + assertThat(ThrottledRetryingIterator.increaseLimit(1)).isEqualTo(5); + assertThat(ThrottledRetryingIterator.increaseLimit(3)).isEqualTo(7); + assertThat(ThrottledRetryingIterator.increaseLimit(10)).isEqualTo(14); + } + + @Test + void testDecreaseLimit() { + assertThat(ThrottledRetryingIterator.decreaseLimit(0)).isEqualTo(1); + assertThat(ThrottledRetryingIterator.decreaseLimit(1)).isEqualTo(1); + assertThat(ThrottledRetryingIterator.decreaseLimit(2)).isEqualTo(1); + assertThat(ThrottledRetryingIterator.decreaseLimit(3)).isEqualTo(2); + assertThat(ThrottledRetryingIterator.decreaseLimit(100)).isEqualTo(90); + } + + @CsvSource({"-1", "0", "1", "3", "100"}) + @ParameterizedTest + void testThrottleIteratorSuccessDeleteLimit(int deleteLimit) throws Exception { + // Iterate range, verify that the number of items deleted matches the number of records + // Ensure multiple transactions are playing nicely with the deleted limit + final int numRecords = 42; // mostly harmless + AtomicInteger iteratedCount = new AtomicInteger(0); // total number of scanned items + AtomicInteger deletedCount = new AtomicInteger(0); // total number of "deleted" items + AtomicInteger successTransactionCount = new AtomicInteger(0); // number of invocations of RangeSuccess callback + AtomicInteger limitRef = new AtomicInteger(-1); + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + quotaManager.deleteCountAdd(1); + return AsyncUtil.DONE; + }; + final Consumer successNotification = quotaManager -> { + successTransactionCount.incrementAndGet(); + iteratedCount.addAndGet(quotaManager.getScannedCount()); + deletedCount.addAndGet(quotaManager.getDeletesCount()); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(numRecords, itemHandler, null, successNotification, -1, deleteLimit, -1, -1, limitRef); + throttledIterator.build().iterateAll(recordStore.asBuilder()).join(); + } + + assertThat(iteratedCount.get()).isEqualTo(numRecords); + assertThat(deletedCount.get()).isEqualTo(numRecords); + + assertThat(limitRef.get()).isZero(); + if (deleteLimit <= 0) { + assertThat(successTransactionCount.get()).isOne(); + } else { + assertThat(successTransactionCount.get()).isEqualTo(numRecords / deleteLimit + 1); + } + } + + @CsvSource({"-1", "0", "50", "100"}) + @ParameterizedTest + void testThrottleIteratorSuccessSecondsLimit(int maxPerSecLimit) throws Exception { + // Iterate range, verify that the number of items scanned matches the number of records + // Assert that the total test takes longer because of the max per sec limit + final int numRecords = 50; + AtomicInteger iteratedCount = new AtomicInteger(0); // total number of scanned items + AtomicInteger scannedCount = new AtomicInteger(0); // total number of scanned items + AtomicInteger deletedCount = new AtomicInteger(0); // total number of "deleted" items + AtomicInteger successTransactionCount = new AtomicInteger(0); // number of invocations of RangeSuccess callback + AtomicInteger limitRef = new AtomicInteger(-1); + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + quotaManager.deleteCountAdd(1); + // Fail the first time, to get the maxRowLimit going + scannedCount.addAndGet(1); + if (scannedCount.get() == 1) { + throw new RuntimeException("Blah"); + } + return AsyncUtil.DONE; + }; + final Consumer successNotification = quotaManager -> { + successTransactionCount.incrementAndGet(); + iteratedCount.addAndGet(quotaManager.getScannedCount()); + deletedCount.addAndGet(quotaManager.getDeletesCount()); + }; + + long startTimeMillis = System.currentTimeMillis(); + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(numRecords, itemHandler, null, successNotification, maxPerSecLimit, -1, -1, -1, limitRef); + throttledIterator.build().iterateAll(recordStore.asBuilder()).join(); + } + + long totalTimeMillis = System.currentTimeMillis() - startTimeMillis; + assertThat(iteratedCount.get()).isEqualTo(numRecords); + assertThat(deletedCount.get()).isEqualTo(numRecords); + if (maxPerSecLimit > 0) { + assertThat(totalTimeMillis).isGreaterThan(TimeUnit.SECONDS.toMillis(numRecords / maxPerSecLimit)); + } + } + + @Test + void testThrottleIteratorTransactionTimeLimit() throws Exception { + // Set time limit for the transaction, add delay to each item handler + final int numRecords = 50; + final int delay = 10; + final int transactionTimeMillis = 50; + AtomicInteger initTransactionCount = new AtomicInteger(0); + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + return MoreAsyncUtil.delayedFuture(delay, TimeUnit.MILLISECONDS); + }; + final Consumer initNotification = quotaManager -> { + initTransactionCount.incrementAndGet(); + }; + + long startTimeMillis = System.currentTimeMillis(); + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(numRecords, itemHandler, initNotification, null, -1, -1, -1, transactionTimeMillis, null); + throttledIterator.build().iterateAll(recordStore.asBuilder()).join(); + } + + long totalTimeMillis = System.currentTimeMillis() - startTimeMillis; + assertThat(totalTimeMillis).isGreaterThan(numRecords * delay); + assertThat(initTransactionCount.get()).isGreaterThanOrEqualTo(numRecords * delay / transactionTimeMillis); + } + + @CsvSource({"-1", "0", "1", "3", "100"}) + @ParameterizedTest + void testThrottleIteratorFailuresDeleteLimit(int deleteLimit) throws Exception { + // Fail some handlings, ensure transaction restarts, items scanned + final int numRecords = 43; + AtomicInteger totalScanned = new AtomicInteger(0); // number of items scanned + AtomicInteger totalDeleted = new AtomicInteger(0); // number of items deleted + AtomicInteger failCount = new AtomicInteger(0); // number of exception thrown + AtomicInteger transactionStartCount = new AtomicInteger(0); // number of invocations of transactionInit callback + AtomicInteger transactionCommitCount = new AtomicInteger(0); // number of invocations of transactionSuccess callback + AtomicInteger lastFailedItem = new AtomicInteger(0); // last item that triggered a failure + final AtomicInteger limitRef = new AtomicInteger(-1); + + final ItemHandler itemHandler = (store, item, quotaManager) -> CompletableFuture.supplyAsync(() -> { + // fail 5 times + if (failCount.get() < 5) { + int itemNumber = item.get(); + // fail every other item starting at item 3 + if ((itemNumber > 2) && (itemNumber >= lastFailedItem.get() + 2)) { + failCount.incrementAndGet(); + lastFailedItem.set(itemNumber); + throw new RuntimeException("intentionally failed while testing item " + item.get()); + } + } + quotaManager.deleteCountAdd(1); + return null; + }); + final Consumer initNotification = quotaManager -> { + transactionStartCount.incrementAndGet(); + }; + final Consumer successNotification = quotaManager -> { + transactionCommitCount.incrementAndGet(); + totalScanned.addAndGet(quotaManager.getScannedCount()); + totalDeleted.addAndGet(quotaManager.getDeletesCount()); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(numRecords, itemHandler, initNotification, successNotification, -1, deleteLimit, -1, -1, limitRef); + throttledIterator.build().iterateAll(recordStore.asBuilder()).join(); + } + assertThat(totalScanned.get()).isEqualTo(numRecords); + assertThat(totalDeleted.get()).isEqualTo(numRecords); + assertThat(failCount.get()).isEqualTo(5); + assertThat(transactionStartCount.get()).isEqualTo(transactionCommitCount.get() + failCount.get()); + assertThat(limitRef.get()).isLessThanOrEqualTo(3); // Scan failure after 3 will cause the limit to become 3 + } + + @CsvSource({"-1", "0", "25", "50", "100"}) + @ParameterizedTest + void testThrottleIteratorWithFailuresSecondsLimit(int maxPerSecLimit) throws Exception { + // Assert correct handling of max per sec items with failures + final int numRecords = 43; + AtomicInteger totalScanned = new AtomicInteger(0); // number of items scanned + AtomicInteger failCount = new AtomicInteger(0); // number of exception thrown + AtomicInteger transactionStartCount = new AtomicInteger(0); // number of invocations of RangeInit callback + AtomicInteger transactionCommitCount = new AtomicInteger(0); // number of invocations of RangeSuccess callback + AtomicInteger lastFailedItem = new AtomicInteger(0); // last item that triggered a failure + + final ItemHandler itemHandler = (store, item, quotaManager) -> CompletableFuture.supplyAsync(() -> { + // fail 5 times + if (failCount.get() < 5) { + int itemNumber = item.get(); + // fail every other item starting at item 3 + if ((itemNumber > 2) && (itemNumber >= lastFailedItem.get() + 2)) { + failCount.incrementAndGet(); + lastFailedItem.set(itemNumber); + throw new RuntimeException("intentionally failed while testing item " + item.get()); + } + } + return null; + }); + final Consumer initNotification = quotaManager -> { + transactionStartCount.incrementAndGet(); + }; + final Consumer successNotification = quotaManager -> { + transactionCommitCount.incrementAndGet(); + totalScanned.addAndGet(quotaManager.getScannedCount()); + }; + + long startTime = System.currentTimeMillis(); + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(numRecords, itemHandler, initNotification, successNotification, maxPerSecLimit, -1, -1, -1, null); + throttledIterator.build().iterateAll(recordStore.asBuilder()).join(); + } + long totalTimeMillis = System.currentTimeMillis() - startTime; + if (maxPerSecLimit > 0) { + assertThat(totalTimeMillis).isGreaterThan(TimeUnit.SECONDS.toMillis(numRecords / maxPerSecLimit)); + } + + assertThat(totalScanned.get()).isEqualTo(numRecords); + assertThat(failCount.get()).isEqualTo(5); + assertThat(transactionStartCount.get()).isEqualTo(transactionCommitCount.get() + failCount.get()); // + } + + @CsvSource({"-1", "0", "1", "10"}) + @ParameterizedTest + void testConstantFailures(int numRetries) throws Exception { + // All item handlers will fail, ensure iteration fails with correct number of retries + final String failureMessage = "intentionally failed while testing"; + AtomicInteger transactionStart = new AtomicInteger(0); + AtomicBoolean success = new AtomicBoolean(false); + + final ItemHandler itemHandler = (store, item, quotaManager) -> futureFailure(); + final Consumer initNotification = quotaManager -> { + transactionStart.incrementAndGet(); + }; + final Consumer successNotification = quotaManager -> { + success.set(true); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(500, itemHandler, initNotification, successNotification, -1, -1, numRetries, -1, null); + Throwable ex = Assertions.catchThrowableOfType(RuntimeException.class, () -> throttledIterator.build().iterateAll(recordStore.asBuilder()).join()); + + assertThat(ex.getMessage()).contains(failureMessage); + } + + if (numRetries == -1) { + assertThat(transactionStart.get()).isEqualTo(ThrottledRetryingIterator.NUMBER_OF_RETRIES + 1); + } else { + assertThat(transactionStart.get()).isEqualTo(numRetries + 1); + } + assertThat(success.get()).isFalse(); + } + + @Test + void testLimitHandlingOnFailure() throws Exception { + // Actually compare set limit when transactions fail + final String failureMessage = "intentionally failed while testing"; + final AtomicInteger limitRef = new AtomicInteger(0); + final AtomicInteger failCount = new AtomicInteger(0); + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + int limit = limitRef.get(); + int scannedCount = quotaManager.getScannedCount(); + switch (failCount.get()) { + case 0: + assertThat(limit).isEqualTo(0); + if (scannedCount == 100) { + failCount.incrementAndGet(); + return futureFailure(); + } + return AsyncUtil.DONE; + case 1: + assertThat(limit).isEqualTo(90); // (90% of 100) + if (scannedCount == 50) { + failCount.incrementAndGet(); + return futureFailure(); + } + return AsyncUtil.DONE; + case 2: + assertThat(limit).isEqualTo(45); // (90% of 50) + // from now on: fail at first item + break; + default: + assertThat(failCount.get()).isLessThanOrEqualTo(100); + break; + } + failCount.incrementAndGet(); + return futureFailure(); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(999, itemHandler, null, null, -1, -1, -1, -1, limitRef); + Throwable ex = Assertions.catchThrowableOfType(RuntimeException.class, () -> throttledIterator.build().iterateAll(recordStore.asBuilder()).join()); + assertThat(ex.getMessage()).contains(failureMessage); + } + + assertThat(limitRef.get()).isOne(); + } + + @Test + void testLimitHandlingOnSuccess() throws Exception { + // Actually compare rows limit when transactions succeed + final AtomicInteger limitRef = new AtomicInteger(0); + final AtomicInteger fullCount = new AtomicInteger(0); + final ItemHandler itemHandler = (store, item, quotaManager) -> { + int limit = limitRef.get(); + int count = fullCount.incrementAndGet(); + // Fail once to get the limit down + if (count == 1) { + throw new RuntimeException("Blah"); + } + if (count <= 41) { // 1 * 40 + 1 (limit * successes) before change + assertThat(limit).isEqualTo(1); + } else if (count <= 241) { // 41 + (5 * 40) + assertThat(limit).isEqualTo(5); + } else if (count <= 601) { // 241 + (9 * 40) + assertThat(limit).isEqualTo(9); + } else { + // end all iterations + quotaManager.markExhausted(); + } + return AsyncUtil.DONE; + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(2000, itemHandler, null, null, -1, -1, -1, -1, limitRef); + throttledIterator.build().iterateAll(recordStore.asBuilder()).join(); + } + } + + @CsvSource({"0", "1", "20", "50"}) + @ParameterizedTest + void testEarlyReturn(int lastItemToScan) throws Exception { + // Early termination of iteration via setting markExhausted + final int numRecords = 50; + AtomicInteger totalScanned = new AtomicInteger(0); // number of items scanned + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + int itemNumber = item.get(); + if (itemNumber == lastItemToScan) { + quotaManager.markExhausted(); + } + return AsyncUtil.DONE; + }; + final Consumer successNotification = quotaManager -> { + totalScanned.addAndGet(quotaManager.getScannedCount()); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder throttledIterator = + iteratorBuilder(numRecords, itemHandler, null, successNotification, -1, -1, -1, -1, null); + throttledIterator.build().iterateAll(recordStore.asBuilder()).join(); + } + assertThat(totalScanned.get()).isEqualTo(Math.min(50, lastItemToScan + 1)); + } + + @Test + void testWithRealRecords() throws Exception { + // A test with saved records, to see that future handling works + final int numRecords = 50; + List itemsScanned = new ArrayList<>(numRecords); + + final CursorFactory cursorFactory = (store, lastResult, rowLimit) -> { + final byte[] continuation = lastResult == null ? null : lastResult.getContinuation().toBytes(); + final ScanProperties scanProperties = ScanProperties.FORWARD_SCAN.with(executeProperties -> executeProperties.setReturnedRowLimit(rowLimit)); + return store.scanRecordKeys(continuation, scanProperties); + }; + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + return store.loadRecordAsync(item.get()).thenApply(rec -> { + TestRecords1Proto.MySimpleRecord.Builder simpleRec = TestRecords1Proto.MySimpleRecord.newBuilder(); + simpleRec.mergeFrom(rec.getRecord()); + itemsScanned.add((int)simpleRec.getRecNo()); + return null; + }); + }; + + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + for (int i = 0; i < numRecords; i++) { + final TestRecords1Proto.MySimpleRecord record = TestRecords1Proto.MySimpleRecord.newBuilder() + .setRecNo(i) + .setStrValueIndexed("Some text") + .setNumValue3Indexed(1415 + i * 7) + .build(); + recordStore.saveRecord(record); + } + commit(context); + } + + // For this test, start and finalize the iteration within the transaction + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + ThrottledRetryingIterator.Builder builder = ThrottledRetryingIterator + .builder(fdb, cursorFactory, itemHandler) + .withNumOfRetries(2); + try (ThrottledRetryingIterator iterator = builder.build()) { + iterator.iterateAll(recordStore.asBuilder()).join(); + } + } + assertThat(itemsScanned).isEqualTo(IntStream.range(0, numRecords).boxed().collect(Collectors.toList())); + + // For this test, start iteration within the transaction but allow it to run (and create more transactions) outside + // of the original transaction + itemsScanned.clear(); + ThrottledRetryingIterator iterator = ThrottledRetryingIterator + .builder(fdb, cursorFactory, itemHandler) + .withNumOfRetries(2) + .build(); + CompletableFuture iterateAll; + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + iterateAll = iterator.iterateAll(recordStore.asBuilder()); + } + iterateAll.join(); + iterator.close(); + assertThat(itemsScanned).isEqualTo(IntStream.range(0, numRecords).boxed().collect(Collectors.toList())); + } + + @Test + void testLateCompleteFutures() throws Exception { + // A test that completes the first future outside the transaction + int numRecords = 50; + List> futures = new ArrayList<>(numRecords); + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + // First future hangs on, all others are immediately completed + CompletableFuture future = (item.get() == 0) ? new CompletableFuture<>() : CompletableFuture.completedFuture(null); + futures.add(future); + return future; + }; + + ThrottledRetryingIterator throttledIterator = + iteratorBuilder(numRecords, itemHandler, null, null, -1, -1, -1, -1, null).build(); + final CompletableFuture iterateAll; + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + iterateAll = throttledIterator.iterateAll(recordStore.asBuilder()); + } + // Only first future in the list - waiting for it to complete + assertThat(futures).hasSize(1); + // complete the first future, release all of them + futures.get(0).complete(null); + iterateAll.join(); + throttledIterator.close(); + assertThat(futures).hasSize(50); + } + + @Test + void testIteratorClosesIncompleteFutures() throws Exception { + // close the runner before the future completes (the futures should be closed) + int numRecords = 50; + AtomicInteger transactionStart = new AtomicInteger(0); + List> futures = new ArrayList<>(numRecords); + + final ItemHandler itemHandler = (store, item, quotaManager) -> { + // First future hangs on, all others are immediately completed + CompletableFuture future = (item.get() == 0) ? new CompletableFuture<>() : CompletableFuture.completedFuture(null); + futures.add(future); + return future; + }; + final Consumer initNotification = quotaManager -> { + transactionStart.incrementAndGet(); + }; + + ThrottledRetryingIterator throttledIterator = + iteratorBuilder(numRecords, itemHandler, initNotification, null, -1, -1, -1, -1, null).build(); + final CompletableFuture iterateAll; + try (FDBRecordContext context = openContext()) { + openSimpleRecordStore(context); + iterateAll = throttledIterator.iterateAll(recordStore.asBuilder()); + } + // Closing the iterator before the first future completes + throttledIterator.close(); + // Only first future in the list, none other was created since the first one didn't complete + assertThat(futures).hasSize(1); + assertThat(futures.get(0).isCompletedExceptionally()).isTrue(); + assertThatThrownBy(() -> futures.get(0).get()).hasCauseInstanceOf(FDBDatabaseRunner.RunnerClosed.class); + // Overall status is failed because we can't runAsync() anymore + assertThatThrownBy(iterateAll::join).hasCauseInstanceOf(FDBDatabaseRunner.RunnerClosed.class); + // Only one transaction started (no retry), since the runner was closed + assertThat(transactionStart.get()).isOne(); + } + + private ThrottledRetryingIterator.Builder iteratorBuilder(final int numRecords, + final ItemHandler itemHandler, + final Consumer initNotification, + final Consumer successNotification, + final int maxPerSecLimit, + final int maxDeletedPerTransaction, final int numRetries, + final int transactionTimeMillis, final AtomicInteger limitRef) { + + ThrottledRetryingIterator.Builder throttledIterator = ThrottledRetryingIterator.builder(fdb, intCursor(numRecords, limitRef), itemHandler); + + if (successNotification != null) { + throttledIterator.withTransactionSuccessNotification(successNotification); + } + if (initNotification != null) { + throttledIterator.withTransactionInitNotification(initNotification); + } + if (maxPerSecLimit != -1) { + throttledIterator.withMaxRecordsScannedPerSec(maxPerSecLimit); + } + if (maxDeletedPerTransaction != -1) { + throttledIterator.withMaxRecordsDeletesPerTransaction(maxDeletedPerTransaction); + } + if (numRetries != -1) { + throttledIterator.withNumOfRetries(numRetries); + } + if (transactionTimeMillis != -1) { + throttledIterator.withTransactionTimeQuotaMillis(transactionTimeMillis); + } + return throttledIterator; + } + + private CursorFactory intCursor(int numInts, AtomicInteger limitRef) { + return listCursor(IntStream.range(0, numInts).boxed().collect(Collectors.toList()), limitRef); + } + + private CursorFactory listCursor(List items, AtomicInteger limitRef) { + return (store, cont, limit) -> { + if (limitRef != null) { + limitRef.set(limit); + } + final byte[] continuation = cont == null ? null : cont.getContinuation().toBytes(); + return RecordCursor.fromList(items, continuation).limitRowsTo(limit); + }; + } + + private CompletableFuture futureFailure() { + return CompletableFuture.failedFuture(new RuntimeException("intentionally failed while testing")); + } +} diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairRunnerTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairRunnerTest.java new file mode 100644 index 0000000000..ac4b10f557 --- /dev/null +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordRepairRunnerTest.java @@ -0,0 +1,504 @@ +/* + * RecordRepairRunnerTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.provider.foundationdb.recordrepair; + +import com.apple.foundationdb.record.RecordMetaData; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; +import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord; +import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; +import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.ParameterizedTestUtils; +import com.google.protobuf.Message; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Test the store's {@link RecordRepairRunner} implementation. + * End to end test for the entire record validation process. + */ +public class RecordRepairRunnerTest extends FDBRecordStoreTestBase { + public static Stream splitFormatVersion() { + return ParameterizedTestUtils.cartesianProduct( + ParameterizedTestUtils.booleans("splitLongRecords"), + ValidationTestUtils.formatVersions(), + ParameterizedTestUtils.booleans("storeVersions"), + Arrays.stream(RecordRepairRunner.ValidationKind.values())); + } + + @ParameterizedTest() + @MethodSource("splitFormatVersion") + void testValidateRecordsNoIssue(boolean splitLongRecords, FormatVersion formatVersion, boolean storeVersions, RecordRepairRunner.ValidationKind validationKind) throws Exception { + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); + saveRecords(splitLongRecords, formatVersion, hook); + + FDBRecordStore.Builder storeBuilder; + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + storeBuilder = store.asBuilder(); + } + RecordRepairRunner runner = RecordRepairRunner.builder(fdb).build(); + RecordValidationStatsResult repairStats = runner.runValidationStats(storeBuilder, validationKind); + List repairResults = runner.runValidationAndRepair(storeBuilder, validationKind, false); + + // Verify records: If we are saving versions - all is OK. + // If we're not saving versions, they will be flagged as missing. + if (storeVersions || validationKind.equals(RecordRepairRunner.ValidationKind.RECORD_VALUE)) { + Assertions.assertThat(repairStats.getStats()).isEmpty(); + Assertions.assertThat(repairResults).hasSize(0); + } else { + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordVersionValidator.CODE_VERSION_MISSING_ERROR, 50); + + Assertions.assertThat(repairResults).hasSize(50); + Assertions.assertThat(repairResults).allMatch(result -> + (!result.isValid()) && result.getErrorCode().equals(RecordVersionValidator.CODE_VERSION_MISSING_ERROR) + ); + } + } + + @ParameterizedTest() + @MethodSource("splitFormatVersion") + void testValidateRecordsMissingRecord(boolean splitLongRecords, FormatVersion formatVersion, boolean storeVersions, RecordRepairRunner.ValidationKind validationKind) throws Exception { + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); + List> records = saveRecords(splitLongRecords, formatVersion, hook); + // Delete a record + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + // Note that the primary keys start with 1, so the location is one-off when removed + store.deleteRecord(records.get(ValidationTestUtils.RECORD_INDEX_WITH_NO_SPLITS).getPrimaryKey()); + store.deleteRecord(records.get(ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey()); + store.deleteRecord(records.get(21).getPrimaryKey()); + store.deleteRecord(records.get(22).getPrimaryKey()); + store.deleteRecord(records.get(44).getPrimaryKey()); + commit(context); + } + + RecordValidationStatsResult repairStats; + List repairResults; + + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + RecordRepairRunner runner = RecordRepairRunner.builder(fdb).build(); + repairStats = runner.runValidationStats(store.asBuilder(), validationKind); + repairResults = runner.runValidationAndRepair(store.asBuilder(), validationKind, false); + } + + // Verify records: The missing records are gone, so won't be flagged, leaving only 45 records around. + if (storeVersions || validationKind.equals(RecordRepairRunner.ValidationKind.RECORD_VALUE)) { + Assertions.assertThat(repairStats.getStats()).isEmpty(); + Assertions.assertThat(repairResults).hasSize(0); + } else { + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordVersionValidator.CODE_VERSION_MISSING_ERROR, 45); + + Assertions.assertThat(repairResults).hasSize(45); + Assertions.assertThat(repairResults).allMatch(result -> + (!result.isValid()) && result.getErrorCode().equals(RecordVersionValidator.CODE_VERSION_MISSING_ERROR) + ); + } + } + + public static Stream splitNumberFormatVersion() { + return ParameterizedTestUtils.cartesianProduct( + Stream.of(0, 1, 2, 3), + ValidationTestUtils.formatVersions(), + ParameterizedTestUtils.booleans("storeVersions"), + Arrays.stream(RecordRepairRunner.ValidationKind.values())); + } + + @ParameterizedTest + @MethodSource("splitNumberFormatVersion") + void testValidateMissingSplit(int splitNumber, FormatVersion formatVersion, boolean storeVersions, RecordRepairRunner.ValidationKind validationKind) throws Exception { + boolean splitLongRecords = true; + + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); + List> savedRecords = saveRecords(splitLongRecords, formatVersion, hook); + // Delete a split + int recordIndex = (splitNumber == 0) ? ValidationTestUtils.RECORD_INDEX_WITH_NO_SPLITS : ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS; + final Tuple primaryKey = savedRecords.get(recordIndex).getPrimaryKey(); + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + // If operating on the short record, #0 is the only split + // If operating on the long record, splits can be 1,2,3 + // Use splitNumber to decide which record to operate on. + // Record #1 in the saved records is a short record, #33 is a long (split) record + byte[] split = ValidationTestUtils.getSplitKey(store, primaryKey, splitNumber); + store.ensureContextActive().clear(split); + commit(context); + } + + FDBRecordStore.Builder storeBuilder; + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + storeBuilder = store.asBuilder(); + } + RecordRepairRunner runner = RecordRepairRunner.builder(fdb).build(); + RecordValidationStatsResult repairStats = runner.runValidationStats(storeBuilder, validationKind); + List repairResults = runner.runValidationAndRepair(storeBuilder, validationKind, false); + + if (splitNumber == 0) { + if (storeVersions) { + if (ValidationTestUtils.versionStoredWithRecord(formatVersion)) { + // record split gone but version remains + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordValueValidator.CODE_SPLIT_ERROR, 1); + Assertions.assertThat(repairResults).hasSize(1); + Assertions.assertThat(repairResults.get(0)).isEqualTo(RecordValidationResult.invalid(primaryKey, RecordValueValidator.CODE_SPLIT_ERROR, "any")); + } else { + // record split gone and version elsewhere - record looks gone + Assertions.assertThat(repairStats.getStats()).isEmpty(); + Assertions.assertThat(repairResults).isEmpty(); + } + } else { + if (validationKind.equals(RecordRepairRunner.ValidationKind.RECORD_VALUE)) { + // not storing and not checking versions + Assertions.assertThat(repairStats.getStats()).isEmpty(); + Assertions.assertThat(repairResults).isEmpty(); + } else { + // not storing but checking version (one record considered gone) + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordVersionValidator.CODE_VERSION_MISSING_ERROR, 49); + + Assertions.assertThat(repairResults).hasSize(49); + Assertions.assertThat(repairResults).allMatch(result -> + (!result.isValid()) && result.getErrorCode().equals(RecordVersionValidator.CODE_VERSION_MISSING_ERROR) + ); + } + } + } else { + final String expectedError = (splitNumber == 3) ? RecordValueValidator.CODE_DESERIALIZE_ERROR : RecordValueValidator.CODE_SPLIT_ERROR; + if (storeVersions) { + // record split missing + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(expectedError, 1); + Assertions.assertThat(repairResults).hasSize(1); + Assertions.assertThat(repairResults.get(0)).isEqualTo(RecordValidationResult.invalid(primaryKey, expectedError, "any")); + } else { + if (validationKind.equals(RecordRepairRunner.ValidationKind.RECORD_VALUE)) { + // not storing and not checking versions - one split missing + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(expectedError, 1); + Assertions.assertThat(repairResults).hasSize(1); + Assertions.assertThat(repairResults.get(0)).isEqualTo(RecordValidationResult.invalid(primaryKey, expectedError, "any")); + } else { + // not storing but checking version (one record with split missing) + Assertions.assertThat(repairStats.getStats()).hasSize(2); + Assertions.assertThat(repairStats.getStats()).containsEntry(expectedError, 1); + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordVersionValidator.CODE_VERSION_MISSING_ERROR, 49); + + Assertions.assertThat(repairResults).hasSize(50); + Assertions.assertThat(repairResults).allMatch(result -> + result.equals(RecordValidationResult.invalid(primaryKey, expectedError, "Blah")) || + (!result.isValid() && result.getErrorCode().equals(RecordVersionValidator.CODE_VERSION_MISSING_ERROR)) + ); + } + } + } + } + + @MethodSource("splitFormatVersion") + @ParameterizedTest + void testValidateRecordsMissingVersion(boolean splitLongRecords, FormatVersion formatVersion, boolean storeVersions, RecordRepairRunner.ValidationKind validationKind) throws Exception { + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); + List> savedRecords = saveRecords(splitLongRecords, formatVersion, hook); + // Delete the versions for the first 20 records + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + for (int i = 0; i < 20; i++) { + byte[] versionKey = ValidationTestUtils.getSplitKey(store, savedRecords.get(i).getPrimaryKey(), -1); + store.ensureContextActive().clear(versionKey); + } + commit(context); + } + + RecordValidationStatsResult repairStats; + List repairResults; + + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + RecordRepairRunner runner = RecordRepairRunner.builder(fdb).build(); + repairStats = runner.runValidationStats(store.asBuilder(), validationKind); + repairResults = runner.runValidationAndRepair(store.asBuilder(), validationKind, false); + } + + if (validationKind.equals(RecordRepairRunner.ValidationKind.RECORD_VALUE)) { + // not validating versions + Assertions.assertThat(repairStats.getStats()).isEmpty(); + Assertions.assertThat(repairResults).isEmpty(); + } else { + if (!storeVersions) { + // checking but not storing versions + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordVersionValidator.CODE_VERSION_MISSING_ERROR, 50); + Assertions.assertThat(repairResults).allMatch(result -> result.getErrorCode().equals(RecordVersionValidator.CODE_VERSION_MISSING_ERROR)); + Assertions.assertThat(repairResults.stream().map(RecordValidationResult::getPrimaryKey).collect(Collectors.toList())) + .isEqualTo(IntStream.range(1, 51).boxed().map(Tuple::from).collect(Collectors.toList())); + } else { + if (!ValidationTestUtils.versionStoredWithRecord(formatVersion)) { + // versions stored elsewhere - none deleted + Assertions.assertThat(repairStats.getStats()).isEmpty(); + Assertions.assertThat(repairResults).isEmpty(); + } else { + // versions stored with records, 20 are deleted + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordVersionValidator.CODE_VERSION_MISSING_ERROR, 20); + Assertions.assertThat(repairResults).allMatch(result -> result.getErrorCode().equals(RecordVersionValidator.CODE_VERSION_MISSING_ERROR)); + Assertions.assertThat(repairResults.stream().map(RecordValidationResult::getPrimaryKey).collect(Collectors.toList())) + .isEqualTo(IntStream.range(1, 21).boxed().map(Tuple::from).collect(Collectors.toList())); + } + } + } + } + + public static Stream formatVersion() { + return ParameterizedTestUtils.cartesianProduct( + ValidationTestUtils.formatVersions(), + Arrays.stream(RecordRepairRunner.ValidationKind.values())); + } + + @MethodSource("formatVersion") + @ParameterizedTest + void testValidateRecordsCorruptRecord(FormatVersion formatVersion, RecordRepairRunner.ValidationKind validationKind) throws Exception { + boolean splitLongRecords = true; + boolean storeVersions = true; + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); + List> savedRecords = saveRecords(splitLongRecords, formatVersion, hook); + // corrupt the value of the record + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + byte[] key = ValidationTestUtils.getSplitKey(store, savedRecords.get(ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey(), 1); + final byte[] value = new byte[] {1, 2, 3, 4, 5}; + store.ensureContextActive().set(key, value); + commit(context); + } + + RecordValidationStatsResult repairStats; + List repairResults; + + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + RecordRepairRunner runner = RecordRepairRunner.builder(fdb).build(); + repairStats = runner.runValidationStats(store.asBuilder(), validationKind); + repairResults = runner.runValidationAndRepair(store.asBuilder(), validationKind, false); + } + + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordValueValidator.CODE_DESERIALIZE_ERROR, 1); + Assertions.assertThat(repairResults).hasSize(1); + Assertions.assertThat(repairResults).allMatch(result -> result.getErrorCode().equals(RecordValueValidator.CODE_DESERIALIZE_ERROR)); + } + + @MethodSource("formatVersion") + @ParameterizedTest + void testValidateRecordsCorruptVersion(FormatVersion formatVersion, RecordRepairRunner.ValidationKind validationKind) throws Exception { + boolean splitLongRecords = true; + boolean storeVersions = true; + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); + List> savedRecords = saveRecords(splitLongRecords, formatVersion, hook); + // corrupt the value of the version + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + byte[] key = ValidationTestUtils.getSplitKey(store, savedRecords.get(ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey(), -1); + final byte[] value = new byte[] {1, 2, 3, 4, 5}; + store.ensureContextActive().set(key, value); + commit(context); + } + + FDBRecordStore.Builder storeBuilder; + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + storeBuilder = store.asBuilder(); + } + + RecordRepairRunner runner = RecordRepairRunner.builder(fdb).build(); + // We don't currently support detecting this king of error + Assertions.assertThatThrownBy(() -> runner.runValidationStats(storeBuilder, validationKind)).hasCauseInstanceOf(UnknownValidationException.class); + Assertions.assertThatThrownBy(() -> runner.runValidationAndRepair(storeBuilder, validationKind, false)).hasCauseInstanceOf(UnknownValidationException.class); + } + + // list of arguments for version and a bitset that has all the combinations of 4 bits set (except all unset) + private static Stream versionAndBitset() { + return ParameterizedTestUtils.cartesianProduct( + ValidationTestUtils.formatVersions(), + ValidationTestUtils.splitsToRemove()); + } + + /** + * A test that runs through all the combinations of 4-bits and erases a split for every bit that is set. + * This simulated all the combinations of splits that can go missing for a record with 3 splits + * (version, splits 1-3). + * + * @param formatVersion the version format + * @param splitsToRemove the splits to remove + */ + @ParameterizedTest + @MethodSource("versionAndBitset") + void testValidateRecordCombinationSplitMissing(FormatVersion formatVersion, BitSet splitsToRemove) throws Exception { + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(true, true); + List> result = saveRecords(true, formatVersion, hook); + // Delete the splits for two of the long records + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + // Delete all the splits that have a bit set + splitsToRemove.stream().forEach(bit -> { + // bit #0 is the version (-1) + // bits #1 - #3 are the split numbers (no split #0 for a split record) + int split = (bit == 0) ? -1 : bit; + byte[] key = ValidationTestUtils.getSplitKey(store, result.get(ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey(), split); + store.ensureContextActive().clear(key); + key = ValidationTestUtils.getSplitKey(store, result.get(ValidationTestUtils.RECORD_INDEX_WITH_TWO_SPLITS).getPrimaryKey(), split); + store.ensureContextActive().clear(key); + }); + commit(context); + } + + RecordRepairRunner.ValidationKind validationKind = RecordRepairRunner.ValidationKind.RECORD_VALUE_AND_VERSION; + FDBRecordStore.Builder storeBuilder; + + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); + storeBuilder = store.asBuilder(); + } + RecordRepairRunner runner = RecordRepairRunner.builder(fdb).build(); + RecordValidationStatsResult repairStats = runner.runValidationStats(storeBuilder, validationKind); + List repairResults = runner.runValidationAndRepair(storeBuilder, validationKind, false); + + Map validationResultMap = repairResults.stream() + .collect(Collectors.toMap(res -> (int)res.getPrimaryKey().getLong(0), res -> res)); + + // Assert that both records are either gone or are valid or flagged as corrupt + Assertions.assertThat( + ValidationTestUtils.recordWillDisappear(2, splitsToRemove, formatVersion) || + ValidationTestUtils.recordWillRemainValid(2, splitsToRemove, formatVersion) || + validationResultMap.containsKey(ValidationTestUtils.RECORD_ID_WITH_TWO_SPLITS)) + .isTrue(); + + Assertions.assertThat( + ValidationTestUtils.recordWillDisappear(3, splitsToRemove, formatVersion) || + ValidationTestUtils.recordWillRemainValid(3, splitsToRemove, formatVersion) || + validationResultMap.containsKey(ValidationTestUtils.RECORD_ID_WITH_THREE_SPLITS)) + .isTrue(); + } + + /** + * Don't store any versions but verify versions, so there would be many results (all records missing versions). + * Validate the max number of results. + * @param maxResultSize the max result size to return + */ + @ParameterizedTest + @CsvSource({"-1", "0", "1", "10", "100"}) + void testValidateMaxResultsReturned(int maxResultSize) throws Exception { + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(true, false); + final FormatVersion maximumSupportedVersion = FormatVersion.getMaximumSupportedVersion(); + saveRecords(true, maximumSupportedVersion, hook); + + RecordRepairRunner.ValidationKind validationKind = RecordRepairRunner.ValidationKind.RECORD_VALUE_AND_VERSION; + FDBRecordStore.Builder storeBuilder; + + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, maximumSupportedVersion); + storeBuilder = store.asBuilder(); + } + + int expectedResultSize; + switch (maxResultSize) { + case -1: + case 0: + case 100: + expectedResultSize = 50; + break; + default: + expectedResultSize = maxResultSize; + break; + } + + RecordRepairRunner runner = RecordRepairRunner.builder(fdb) + .withMaxResultsReturned(maxResultSize) + .build(); + RecordValidationStatsResult repairStats = runner.runValidationStats(storeBuilder, validationKind); + List repairResults = runner.runValidationAndRepair(storeBuilder, validationKind, false); + + Assertions.assertThat(repairStats.getStats()).hasSize(1); + Assertions.assertThat(repairStats.getStats()).containsEntry(RecordVersionValidator.CODE_VERSION_MISSING_ERROR, 50); + Assertions.assertThat(repairResults).hasSize(expectedResultSize); + } + + /** + * Allow only a few scans per sec. + * Validate the total length of time the validation takes. + */ + @Test + void testValidateMaxScansPerSec() throws Exception { + final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(true, true); + final FormatVersion maximumSupportedVersion = FormatVersion.getMaximumSupportedVersion(); + saveRecords(1, 200, true, maximumSupportedVersion, simpleMetaData(hook)); + + RecordRepairRunner.ValidationKind validationKind = RecordRepairRunner.ValidationKind.RECORD_VALUE_AND_VERSION; + FDBRecordStore.Builder storeBuilder; + + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = openSimpleRecordStore(context, hook, maximumSupportedVersion); + storeBuilder = store.asBuilder(); + } + RecordRepairRunner runner = RecordRepairRunner.builder(fdb) + // 200 records at 100 records / sec should average out to 2 seconds (actual scanning time is minimal) + .withMaxRecordScannedPerSec(100) + // have transaction as short as we can since the per-sec calculation only kicks in when transaction is done + .withTransactionTimeQuotaMillis(1) + .build(); + + long start = System.currentTimeMillis(); + RecordValidationStatsResult repairStats = runner.runValidationStats(storeBuilder, validationKind); + long mid = System.currentTimeMillis(); + List repairResults = runner.runValidationAndRepair(storeBuilder, validationKind, false); + long end = System.currentTimeMillis(); + + Assertions.assertThat(mid - start).isGreaterThan(2000); + Assertions.assertThat(end - mid).isGreaterThan(2000); + } + + + private List> saveRecords(final boolean splitLongRecords, FormatVersion formatVersion, final RecordMetaDataHook hook) throws Exception { + return saveRecords(1, 50, splitLongRecords, formatVersion, simpleMetaData(hook)); + } + + private List> saveRecords(int initialId, int totalRecords, final boolean splitLongRecords, FormatVersion formatVersion, final RecordMetaData metaData) throws Exception { + List> result; + try (FDBRecordContext context = openContext()) { + final FDBRecordStore store = createOrOpenRecordStore(context, metaData, path, formatVersion); + result = ValidationTestUtils.saveRecords(store, initialId, totalRecords, splitLongRecords); + commit(context); + } + return result; + } +} diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidationTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidationTest.java index d7b68c33e4..1432fa8a06 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidationTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/RecordValidationTest.java @@ -329,7 +329,6 @@ void testValidateRecordCorruptSplit(FormatVersion formatVersion) throws Exceptio // Validate by primary key try (FDBRecordContext context = openContext()) { final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); - RecordValidator valueValidator = new RecordValueValidator(store); result.forEach(rec -> { validateRecordValue(store, rec.getPrimaryKey(), RecordValueValidator.CODE_DESERIALIZE_ERROR); }); @@ -390,6 +389,7 @@ private void validate(String expectedValueValidationCode, RecordValidator valida RecordValidationResult actualResult = null; actualResult = validator.validateRecordAsync(primaryKey).join(); + assertEquals(primaryKey, actualResult.getPrimaryKey()); if (expectedValueValidationCode.equals(RecordValidationResult.CODE_VALID)) { assertTrue(actualResult.isValid()); } else { diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ScanRecordKeysTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ScanRecordKeysTest.java index aabad50554..f61d5ac658 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ScanRecordKeysTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ScanRecordKeysTest.java @@ -36,9 +36,8 @@ import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord; import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; -import com.apple.foundationdb.record.provider.foundationdb.SplitHelper; import com.apple.foundationdb.tuple.Tuple; -import com.google.common.base.Strings; +import com.apple.test.ParameterizedTestUtils; import com.google.protobuf.Message; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -73,14 +72,6 @@ public class ScanRecordKeysTest extends FDBRecordStoreTestBase { private static final int ROW_LIMIT = 19; private static final int BYTES_LIMIT = 2000; - private static final int LONG_RECORD_SPACING = 17; - private static final int RECORD_INDEX_WITH_NO_SPLITS = 1; - private static final int RECORD_ID_WITH_NO_SPLITS = RECORD_INDEX_WITH_NO_SPLITS + 1; - private static final int RECORD_INDEX_WITH_TWO_SPLITS = 16; - private static final int RECORD_ID_WITH_TWO_SPLITS = RECORD_INDEX_WITH_TWO_SPLITS + 1; - private static final int RECORD_INDEX_WITH_THREE_SPLITS = 33; - private static final int RECORD_ID_WITH_THREE_SPLITS = RECORD_INDEX_WITH_THREE_SPLITS + 1; - public enum UseContinuations { NONE, CONTINUATIONS, BYTE_LIMIT } /** @@ -95,14 +86,14 @@ void monitorFormatVersion() { } public static Stream splitContinuationVersion() { - return Stream.of(true, false) - .flatMap(split -> Arrays.stream(UseContinuations.values()) - .flatMap(useContinuations -> ValidationTestUtils.formatVersions() - .flatMap(formatVersion -> Stream.of(true, false) - .map(storeVersions -> Arguments.of(split, useContinuations, formatVersion, storeVersions))))); + return ParameterizedTestUtils.cartesianProduct( + ParameterizedTestUtils.booleans("splitLongRecords"), + Arrays.stream(UseContinuations.values()), + ValidationTestUtils.formatVersions(), + ParameterizedTestUtils.booleans("storeVersions")); } - @ParameterizedTest(name = "testIterateRecordsNoIssue [splitLongRecords = {0}, useContinuations = {1}, formatVersion = {2}, storeVersions = {3}]") + @ParameterizedTest @MethodSource("splitContinuationVersion") void testIterateRecordsNoIssue(boolean splitLongRecords, UseContinuations useContinuations, FormatVersion formatVersion, boolean storeVersions) throws Exception { final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); @@ -114,7 +105,7 @@ void testIterateRecordsNoIssue(boolean splitLongRecords, UseContinuations useCon assertEquals(expectedKeys, actualKeys); } - @ParameterizedTest(name = "testIterateRecordsMissingRecord [splitLongRecords = {0}, useContinuations = {1}, formatVersion = {2}, storeVersions = {3}]") + @ParameterizedTest @MethodSource("splitContinuationVersion") void testIterateRecordsMissingRecord(boolean splitLongRecords, UseContinuations useContinuations, FormatVersion formatVersion, boolean storeVersions) throws Exception { final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, storeVersions); @@ -123,8 +114,8 @@ void testIterateRecordsMissingRecord(boolean splitLongRecords, UseContinuations try (FDBRecordContext context = openContext()) { final FDBRecordStore store = openSimpleRecordStore(context, hook, formatVersion); // Note that the primary keys start with 1, so the location is one-off when removed - store.deleteRecord(result.get(RECORD_INDEX_WITH_NO_SPLITS).getPrimaryKey()); - store.deleteRecord(result.get(RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey()); + store.deleteRecord(result.get(ValidationTestUtils.RECORD_INDEX_WITH_NO_SPLITS).getPrimaryKey()); + store.deleteRecord(result.get(ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey()); store.deleteRecord(result.get(21).getPrimaryKey()); store.deleteRecord(result.get(22).getPrimaryKey()); store.deleteRecord(result.get(44).getPrimaryKey()); @@ -133,19 +124,20 @@ void testIterateRecordsMissingRecord(boolean splitLongRecords, UseContinuations // Scan records ScanProperties scanProperties = getScanProperties(useContinuations); final List actualKeys = scanKeys(useContinuations, formatVersion, hook, scanProperties); - List expectedKeys = getExpectedPrimaryKeys(i -> !Set.of(RECORD_ID_WITH_NO_SPLITS, RECORD_ID_WITH_THREE_SPLITS, 22, 23, 45).contains(i)); + List expectedKeys = getExpectedPrimaryKeys(i -> !Set.of(ValidationTestUtils.RECORD_ID_WITH_NO_SPLITS, ValidationTestUtils.RECORD_ID_WITH_THREE_SPLITS, 22, 23, 45).contains(i)); assertEquals(expectedKeys, actualKeys); } public static Stream splitNumberContinuationsVersion() { - return Stream.of(0, 1, 2, 3) - .flatMap(splitNumber -> Stream.of(UseContinuations.values()) - .flatMap(useContinuations -> ValidationTestUtils.formatVersions() - .flatMap(formatVersion -> Stream.of(true, false) - .map(storeVersions -> Arguments.of(splitNumber, useContinuations, formatVersion, storeVersions))))); + return ParameterizedTestUtils.cartesianProduct( + Stream.of(0, 1, 2, 3), + Arrays.stream(UseContinuations.values()), + ValidationTestUtils.formatVersions(), + ParameterizedTestUtils.booleans("storeVersions") + ); } - @ParameterizedTest(name = "testIterateRecordsMissingSplit [splitNumber = {0}, useContinuations = {1}, formatVersion = {2}, storeVersions = {3}]") + @ParameterizedTest @MethodSource("splitNumberContinuationsVersion") void testIterateRecordsMissingSplit(int splitNumber, UseContinuations useContinuations, FormatVersion formatVersion, boolean storeVersions) throws Exception { boolean splitLongRecords = true; @@ -159,7 +151,7 @@ void testIterateRecordsMissingSplit(int splitNumber, UseContinuations useContinu // If operating on the long record, splits can be 1,2,3 // Use splitNumber to decide which record to operate on. // Record #1 in the saved records is a short record, #33 is a long (split) record - int recordIndex = (splitNumber == 0) ? RECORD_INDEX_WITH_NO_SPLITS : RECORD_INDEX_WITH_THREE_SPLITS; + int recordIndex = (splitNumber == 0) ? ValidationTestUtils.RECORD_INDEX_WITH_NO_SPLITS : ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS; byte[] split = ValidationTestUtils.getSplitKey(store, savedRecords.get(recordIndex).getPrimaryKey(), splitNumber); store.ensureContextActive().clear(split); commit(context); @@ -172,7 +164,7 @@ void testIterateRecordsMissingSplit(int splitNumber, UseContinuations useContinu // When format version is below 6 and the record is a short record, deleting the only split will make the record disappear // When format version is 6 or 10, and we're not saving version, the same if ((splitNumber == 0) && (!ValidationTestUtils.versionStoredWithRecord(formatVersion) || !storeVersions)) { - expectedKeys = getExpectedPrimaryKeys(i -> i != RECORD_ID_WITH_NO_SPLITS); + expectedKeys = getExpectedPrimaryKeys(i -> i != ValidationTestUtils.RECORD_ID_WITH_NO_SPLITS); } else { expectedKeys = getExpectedPrimaryKeys(); } @@ -185,13 +177,14 @@ void testIterateRecordsMissingSplit(int splitNumber, UseContinuations useContinu } public static Stream splitContinuationFormatVersion() { - return Stream.of(true, false) - .flatMap(split -> Arrays.stream(UseContinuations.values()) - .flatMap(useContinuations -> ValidationTestUtils.formatVersions() - .map(formatVersion -> Arguments.of(split, useContinuations, formatVersion)))); + return ParameterizedTestUtils.cartesianProduct( + ParameterizedTestUtils.booleans("splitLongVersions"), + Arrays.stream(UseContinuations.values()), + ValidationTestUtils.formatVersions() + ); } - @ParameterizedTest(name = "testIterateRecordsMissingVersion [splitLongRecords = {0}, useContinuations = {1}, formatVersion = {2}]") + @ParameterizedTest @MethodSource("splitContinuationFormatVersion") void testIterateRecordsMissingVersion(boolean splitLongRecords, UseContinuations useContinuations, FormatVersion formatVersion) throws Exception { final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(splitLongRecords, true); @@ -220,7 +213,7 @@ void testIterateRecordsMissingVersion(boolean splitLongRecords, UseContinuations * @param useContinuations whether to use continuations * @param formatVersion what format version to use */ - @ParameterizedTest(name = "testIterateRecordsMixedVersions [splitLongRecords = {0}, useContinuations = {1}, formatVersion = {2}]") + @ParameterizedTest @MethodSource("splitContinuationFormatVersion") void testIterateRecordsMixedVersions(boolean splitLongRecords, UseContinuations useContinuations, FormatVersion formatVersion) throws Exception { // This test changes the metadata so needs special attention to the metadata version @@ -249,10 +242,11 @@ void testIterateRecordsMixedVersions(boolean splitLongRecords, UseContinuations // list of arguments for version and a bitset that has all the combinations of 4 bits set (except all unset) private static Stream continuationVersionAndBitset() { - return Arrays.stream(UseContinuations.values()) - .flatMap(useContinuations -> ValidationTestUtils.formatVersions() - .flatMap(version -> ValidationTestUtils.splitsToRemove() - .map(bitset -> Arguments.of(useContinuations, version, bitset)))); + return ParameterizedTestUtils.cartesianProduct( + Arrays.stream(UseContinuations.values()), + ValidationTestUtils.formatVersions(), + ValidationTestUtils.splitsToRemove() + ); } /** @@ -264,7 +258,7 @@ private static Stream continuationVersionAndBitset() { * @param formatVersion the version format * @param splitsToRemove the splits to remove */ - @ParameterizedTest(name = "testIterateRecordCombinationSplitMissing [useContinuations = {0}, formatVersion = {1}, splitsToRemove = {2}]") + @ParameterizedTest @MethodSource("continuationVersionAndBitset") void testIterateRecordCombinationSplitMissing(UseContinuations useContinuations, FormatVersion formatVersion, BitSet splitsToRemove) throws Exception { final RecordMetaDataHook hook = ValidationTestUtils.getRecordMetaDataHook(true, true); @@ -277,9 +271,9 @@ void testIterateRecordCombinationSplitMissing(UseContinuations useContinuations, // bit #0 is the version (-1) // bits #1 - #3 are the split numbers (no split #0 for a split record) int split = (bit == 0) ? -1 : bit; - byte[] key = ValidationTestUtils.getSplitKey(store, result.get(RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey(), split); + byte[] key = ValidationTestUtils.getSplitKey(store, result.get(ValidationTestUtils.RECORD_INDEX_WITH_THREE_SPLITS).getPrimaryKey(), split); store.ensureContextActive().clear(key); - key = ValidationTestUtils.getSplitKey(store, result.get(RECORD_INDEX_WITH_TWO_SPLITS).getPrimaryKey(), split); + key = ValidationTestUtils.getSplitKey(store, result.get(ValidationTestUtils.RECORD_INDEX_WITH_TWO_SPLITS).getPrimaryKey(), split); store.ensureContextActive().clear(key); }); commit(context); @@ -290,11 +284,11 @@ void testIterateRecordCombinationSplitMissing(UseContinuations useContinuations, final List actualKeys = scanKeys(useContinuations, formatVersion, hook, scanProperties); // The cases where the record will go missing altogether Set keysExpectedToDisappear = new HashSet<>(); - if (recordWillDisappear(2, splitsToRemove, formatVersion)) { - keysExpectedToDisappear.add(RECORD_ID_WITH_TWO_SPLITS); + if (ValidationTestUtils.recordWillDisappear(2, splitsToRemove, formatVersion)) { + keysExpectedToDisappear.add(ValidationTestUtils.RECORD_ID_WITH_TWO_SPLITS); } - if (recordWillDisappear(3, splitsToRemove, formatVersion)) { - keysExpectedToDisappear.add(RECORD_ID_WITH_THREE_SPLITS); + if (ValidationTestUtils.recordWillDisappear(3, splitsToRemove, formatVersion)) { + keysExpectedToDisappear.add(ValidationTestUtils.RECORD_ID_WITH_THREE_SPLITS); } List expectedKeys = getExpectedPrimaryKeys(i -> !keysExpectedToDisappear.contains(i)); @@ -307,25 +301,6 @@ void testIterateRecordCombinationSplitMissing(UseContinuations useContinuations, } } - private boolean recordWillDisappear(int numOfSplits, BitSet splitsToRemove, FormatVersion formatVersion) { - final BitSet allThreeSplits = ValidationTestUtils.toBitSet(0b1111); - final BitSet allThreeSplitsWithoutVersion = ValidationTestUtils.toBitSet(0b1110); - final BitSet allTwoSplits = ValidationTestUtils.toBitSet(0b0111); - final BitSet allTwoSplitsWithoutVersion = ValidationTestUtils.toBitSet(0b0110); - final boolean storingVersion = ValidationTestUtils.versionStoredWithRecord(formatVersion); - switch (numOfSplits) { - case 3: - return (splitsToRemove.equals(allThreeSplits) || - (!storingVersion && splitsToRemove.equals(allThreeSplitsWithoutVersion))); - case 2: - return (splitsToRemove.equals(allThreeSplits) || splitsToRemove.equals(allTwoSplits) || - (!storingVersion && - (splitsToRemove.equals(allThreeSplitsWithoutVersion) || splitsToRemove.equals(allTwoSplitsWithoutVersion)))); - default: - throw new IllegalArgumentException("Non supported number of splits"); - } - } - @Nullable private static ScanProperties getScanProperties(final UseContinuations useContinuations) { ExecuteProperties executeProperties; @@ -405,43 +380,19 @@ private void assertRecordsCorrupted(final FormatVersion formatVersion, final Rec } private List> saveRecords(final boolean splitLongRecords, FormatVersion formatVersion, final RecordMetaDataHook hook) throws Exception { - return saveRecords(1, 50, splitLongRecords, formatVersion, hook); - } - - private List> saveRecords(int initialId, int totalRecords, final boolean splitLongRecords, FormatVersion formatVersion, final RecordMetaDataHook hook) throws Exception { - return saveRecords(initialId, totalRecords, splitLongRecords, formatVersion, simpleMetaData(hook)); + return saveRecords(1, 50, splitLongRecords, formatVersion, simpleMetaData(hook)); } private List> saveRecords(int initialId, int totalRecords, final boolean splitLongRecords, FormatVersion formatVersion, final RecordMetaData metaData) throws Exception { List> result; try (FDBRecordContext context = openContext()) { final FDBRecordStore store = createOrOpenRecordStore(context, metaData, path, formatVersion); - List> result1 = new ArrayList<>(totalRecords); - for (int i = initialId; i < initialId + totalRecords; i++) { - final String someText = Strings.repeat("x", recordTextSize(splitLongRecords, i)); - final TestRecords1Proto.MySimpleRecord record = TestRecords1Proto.MySimpleRecord.newBuilder() - .setRecNo(i) - .setStrValueIndexed(someText) - .setNumValue3Indexed(1415 + i * 7) - .build(); - result1.add(store.saveRecord(record)); - } - result = result1; + result = ValidationTestUtils.saveRecords(store, initialId, totalRecords, splitLongRecords); commit(context); } return result; } - private int recordTextSize(boolean splitLongRecords, int recordId) { - // Every 17th record is long. The number of splits increases with the record ID - if (splitLongRecords && ((recordId % LONG_RECORD_SPACING) == 0)) { - final int sizeInSplits = recordId / LONG_RECORD_SPACING; - return SplitHelper.SPLIT_RECORD_SIZE * sizeInSplits + 2; - } else { - return 10; - } - } - @Nonnull private static List getExpectedPrimaryKeys() { return getExpectedPrimaryKeys(i -> true); diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ValidationTestUtils.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ValidationTestUtils.java index 8cd6ead5c9..359c47289f 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ValidationTestUtils.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/recordrepair/ValidationTestUtils.java @@ -20,17 +20,34 @@ package com.apple.foundationdb.record.provider.foundationdb.recordrepair; +import com.apple.foundationdb.record.TestRecords1Proto; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreTestBase; +import com.apple.foundationdb.record.provider.foundationdb.FDBStoredRecord; import com.apple.foundationdb.record.provider.foundationdb.FormatVersion; +import com.apple.foundationdb.record.provider.foundationdb.SplitHelper; import com.apple.foundationdb.tuple.Tuple; +import com.google.common.base.Strings; +import com.google.protobuf.Message; import javax.annotation.Nonnull; +import java.util.ArrayList; import java.util.BitSet; +import java.util.List; import java.util.stream.LongStream; import java.util.stream.Stream; public class ValidationTestUtils { + private static final int LONG_RECORD_SPACING = 17; + // A few constants for records that were saved with saveRecords() below + public static final int RECORD_INDEX_WITH_NO_SPLITS = 1; + public static final int RECORD_ID_WITH_NO_SPLITS = RECORD_INDEX_WITH_NO_SPLITS + 1; + public static final int RECORD_INDEX_WITH_TWO_SPLITS = 16; + public static final int RECORD_ID_WITH_TWO_SPLITS = RECORD_INDEX_WITH_TWO_SPLITS + 1; + public static final int RECORD_INDEX_WITH_THREE_SPLITS = 33; + public static final int RECORD_ID_WITH_THREE_SPLITS = RECORD_INDEX_WITH_THREE_SPLITS + 1; + + @Nonnull public static Stream formatVersions() { return Stream.of( @@ -44,11 +61,75 @@ public static BitSet toBitSet(final long l) { return BitSet.valueOf(new long[] {l}); } + /** + * Create a stream of bitsets that represent splits combinations. + * The bitsets represent combinations of splits to be removed by the test: + * bit 0 is the version (Split #-1) + * bits 1-3 are the 1st (#1), 2nd (#2) or 3rd (#3) splits of a long (split) record + * @return a stream of splits combinations + */ @Nonnull public static Stream splitsToRemove() { return LongStream.range(1, 16).mapToObj(l -> toBitSet(l)); } + /** + * Return TRUE if a record matching the given parameters will disappear if the given splits are removed. + * This is useful to know since a gone record will not show up in the validation results. + * A record will disappear if all of its splits and version are deleted, or if we are not storing version with the + * record and all of its data is deleted. + * @param numOfSplits number of splits for the record (2 or 3) + * @param splitsToRemove the bitset of splits to be removed + * @param formatVersion the format version for the store + * @return TRUE if the record will be gone if the splits are removed + */ + public static boolean recordWillDisappear(int numOfSplits, BitSet splitsToRemove, FormatVersion formatVersion) { + final BitSet allThreeSplits = toBitSet(0b1111); + final BitSet allThreeSplitsWithoutVersion = toBitSet(0b1110); + final BitSet allTwoSplits = toBitSet(0b0111); + final BitSet allTwoSplitsWithoutVersion = toBitSet(0b0110); + final boolean storingVersion = versionStoredWithRecord(formatVersion); + switch (numOfSplits) { + case 3: + return (splitsToRemove.equals(allThreeSplits) || + (!storingVersion && splitsToRemove.equals(allThreeSplitsWithoutVersion))); + case 2: + return (splitsToRemove.equals(allThreeSplits) || splitsToRemove.equals(allTwoSplits) || + (!storingVersion && + (splitsToRemove.equals(allThreeSplitsWithoutVersion) || splitsToRemove.equals(allTwoSplitsWithoutVersion)))); + default: + throw new IllegalArgumentException("Non supported number of splits"); + } + } + + /** + * Return TRUE if removing the given splits will leave the record in valid state. + * The record will remain valid if we are removing non-existing splits or a version when the version is stored elsewhere. + * @param numOfSplits number of splits for the record (2 or 3) + * @param splitsToRemove the bitset of splits to be removed + * @param formatVersion the format version for the store + * @return TRUE if the record will be valid if the splits are removed + */ + public static boolean recordWillRemainValid(int numOfSplits, BitSet splitsToRemove, FormatVersion formatVersion) { + final BitSet thirdSplitOnly = toBitSet(0b1000); + final BitSet thirdSplitAndVerion = toBitSet(0b1001); + final BitSet versionSplitOnly = toBitSet(0b0001); + final boolean storingVersion = versionStoredWithRecord(formatVersion); + if ((numOfSplits == 2) && thirdSplitOnly.equals(splitsToRemove)) { + // removing non-existent 3rd split + return true; + } + if ((numOfSplits == 2) && thirdSplitAndVerion.equals(splitsToRemove) && !ValidationTestUtils.versionStoredWithRecord(formatVersion)) { + // version stored elsewhere and we remove version and non-existent split + return true; + } + if (versionSplitOnly.equals(splitsToRemove) && !ValidationTestUtils.versionStoredWithRecord(formatVersion)) { + // version stored elsewhere and we only remove the version + return true; + } + return false; + } + @Nonnull public static FDBRecordStoreTestBase.RecordMetaDataHook getRecordMetaDataHook(final boolean splitLongRecords) { return getRecordMetaDataHook(splitLongRecords, true); @@ -72,4 +153,33 @@ public static byte[] getSplitKey(FDBRecordStore store, Tuple primaryKey, int spl public static boolean versionStoredWithRecord(final FormatVersion formatVersion) { return formatVersion.isAtLeast(FormatVersion.SAVE_VERSION_WITH_RECORD); } + + + public static List> saveRecords(FDBRecordStore store, boolean splitLongRecords) throws Exception { + return saveRecords(store, 1, 50, splitLongRecords); + } + + public static List> saveRecords(FDBRecordStore store, int initialId, int totalRecords, final boolean splitLongRecords) throws Exception { + List> result1 = new ArrayList<>(totalRecords); + for (int i = initialId; i < initialId + totalRecords; i++) { + final String someText = Strings.repeat("x", recordTextSize(splitLongRecords, i)); + final TestRecords1Proto.MySimpleRecord record = TestRecords1Proto.MySimpleRecord.newBuilder() + .setRecNo(i) + .setStrValueIndexed(someText) + .setNumValue3Indexed(1415 + i * 7) + .build(); + result1.add(store.saveRecord(record)); + } + return result1; + } + + private static int recordTextSize(boolean splitLongRecords, int recordId) { + // Every 17th record is long. The number of splits increases with the record ID + if (splitLongRecords && ((recordId % LONG_RECORD_SPACING) == 0)) { + final int sizeInSplits = recordId / LONG_RECORD_SPACING; + return SplitHelper.SPLIT_RECORD_SIZE * sizeInSplits + 2; + } else { + return 10; + } + } }