Skip to content

Commit e5a5d44

Browse files
authored
Create throttled and retrying record iterator (#3350)
This PR creates the `ThrottledRetryingIterator` for use with cases that need a reliable retrying mechanism with resource controls. for the most part, this is new code that is not in use yet. Small changes to the existing code base (log message fields, executor method made public, refactoring of the FutureAutoClose) were made as well. This class is meant to live at the same level as other `FDBDatabaseRunner`s even though this class does not implement this interface. Hence, this is using the `TransactionalRunner` and `FutureAutoClose` classes instead of the (already retrying) `FDBDatabaseRunnerImpl`. The iterator implements resource controls - ops/sec and ops/transaction for scan and delete operations. It does not implement similar controls for other operations (update, create etc). These can be added later when they are needed. Resolves #3355
1 parent e8ba8e1 commit e5a5d44

File tree

7 files changed

+1472
-1
lines changed

7 files changed

+1472
-1
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/logging/LogMessageKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ public enum LogMessageKeys {
312312
TOTAL_RECORDS_SCANNED,
313313
TOTAL_RECORDS_SCANNED_DURING_FAILURES,
314314
SCRUB_TYPE,
315+
RETRY_COUNT,
315316

316317
// time limits milliseconds
317318
TIME_LIMIT_MILLIS("time_limit_milliseconds"),

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBDatabase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,13 @@ public Executor getExecutor() {
740740
return factory.getExecutor();
741741
}
742742

743-
protected Executor newContextExecutor(@Nullable Map<String, String> mdcContext) {
743+
/**
744+
* Create a new executor for the database. This is used internally when creating a transaction or a new runner.
745+
* @param mdcContext if present, the MDC context to be made available within the executors threads
746+
* @return the new executor
747+
*/
748+
@API(API.Status.INTERNAL)
749+
public Executor newContextExecutor(@Nullable Map<String, String> mdcContext) {
744750
return factory.newContextExecutor(mdcContext);
745751
}
746752

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* CursorFactory.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record.provider.foundationdb.runners.throttled;
22+
23+
import com.apple.foundationdb.annotation.API;
24+
import com.apple.foundationdb.record.RecordCursor;
25+
import com.apple.foundationdb.record.RecordCursorResult;
26+
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
27+
28+
import javax.annotation.Nonnull;
29+
import javax.annotation.Nullable;
30+
31+
/**
32+
* Create a cursor with the given store and last result.
33+
* @param <T> the type of item the cursor iterates over.
34+
* This factory method is used by the {@link ThrottledRetryingIterator} to create inner cursors when needed.
35+
* The iterator creates transactions based off of the constraints given, and for each such transaction, a new inner
36+
* cursor gets created.
37+
*/
38+
@API(API.Status.EXPERIMENTAL)
39+
@FunctionalInterface
40+
public interface CursorFactory<T> {
41+
/**
42+
* Create a new inner cursor for the {@link ThrottledRetryingIterator}.
43+
* @param store the record store to use
44+
* @param lastResult the last result processed by the previous cursor (use for continuation). Null is none.
45+
* @param rowLimit the adjusted row limit to use
46+
* @return a newly created cursor with the given continuation and limit
47+
*/
48+
RecordCursor<T> createCursor(@Nonnull FDBRecordStore store, @Nullable RecordCursorResult<T> lastResult, int rowLimit);
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* ItemHandler.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.record.provider.foundationdb.runners.throttled;
22+
23+
import com.apple.foundationdb.annotation.API;
24+
import com.apple.foundationdb.record.RecordCursorResult;
25+
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
26+
27+
import javax.annotation.Nonnull;
28+
import java.util.concurrent.CompletableFuture;
29+
30+
/**
31+
* A handler of an item during the iteration of a {@link ThrottledRetryingIterator}.
32+
* @param <T> the type of element in the iteration
33+
*/
34+
@API(API.Status.EXPERIMENTAL)
35+
@FunctionalInterface
36+
public interface ItemHandler<T> {
37+
/**
38+
* Process an item.
39+
* Once done processing, return a future that controls whether to continue the iteration or stop.
40+
* The quota manager holds the current state of the iteration (per the current transaction). The handler can
41+
* change the state via {@link ThrottledRetryingIterator.QuotaManager#deleteCountAdd(int)},
42+
* {@link ThrottledRetryingIterator.QuotaManager#deleteCountInc()} and
43+
* {@link ThrottledRetryingIterator.QuotaManager#markExhausted()}.
44+
* @param store the record store to use
45+
* @param lastResult the result to process
46+
* @param quotaManager the current quota manager state
47+
* @return Future (Void) for when the operation is complete
48+
*/
49+
@Nonnull
50+
CompletableFuture<Void> handleOneItem(@Nonnull FDBRecordStore store, @Nonnull RecordCursorResult<T> lastResult, @Nonnull ThrottledRetryingIterator.QuotaManager quotaManager);
51+
}

0 commit comments

Comments
 (0)