Skip to content

Commit 38abdb4

Browse files
committed
[FLINK-37900][rocksdb] support configurable jitter in rocksdb state uploader
1 parent dda2acb commit 38abdb4

File tree

8 files changed

+84
-11
lines changed

8 files changed

+84
-11
lines changed

docs/layouts/shortcodes/generated/expert_rocksdb_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Integer</td>
1515
<td>The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.If negative, the common (TM) IO thread pool is used (see cluster.io-pool.size)</td>
1616
</tr>
17+
<tr>
18+
<td><h5>state.backend.rocksdb.checkpoint.upload-jitter</h5></td>
19+
<td style="word-wrap: break-word;">0 ms</td>
20+
<td>Duration</td>
21+
<td>The time interval that will be used to create jitter for each checkpoint file upload.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>state.backend.rocksdb.localdir</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/rocksdb_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Integer</td>
1515
<td>The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.If negative, the common (TM) IO thread pool is used (see cluster.io-pool.size)</td>
1616
</tr>
17+
<tr>
18+
<td><h5>state.backend.rocksdb.checkpoint.upload-jitter</h5></td>
19+
<td style="word-wrap: break-word;">0 ms</td>
20+
<td>Duration</td>
21+
<td>The time interval that will be used to create jitter for each checkpoint file upload.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>state.backend.rocksdb.localdir</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackendBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272

7373
import java.io.File;
7474
import java.io.IOException;
75+
import java.time.Duration;
7576
import java.util.ArrayList;
7677
import java.util.Collection;
7778
import java.util.LinkedHashMap;
@@ -132,6 +133,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
132133
private RocksDBNativeMetricOptions nativeMetricOptions;
133134

134135
private int numberOfTransferingThreads;
136+
private Duration uploadJitter;
135137
private long writeBatchSize =
136138
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
137139

@@ -200,6 +202,7 @@ public RocksDBKeyedStateBackendBuilder(
200202
this.nativeMetricOptions = new RocksDBNativeMetricOptions();
201203
this.numberOfTransferingThreads =
202204
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue();
205+
this.uploadJitter = RocksDBOptions.CHECKPOINT_UPLOAD_JITTER.defaultValue();
203206
}
204207

205208
@VisibleForTesting
@@ -626,7 +629,8 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
626629
injectRocksDBStateUploader == null
627630
? new RocksDBStateUploader(
628631
RocksDBStateDataTransferHelper.forThreadNumIfSpecified(
629-
numberOfTransferingThreads, ioExecutor))
632+
numberOfTransferingThreads, ioExecutor),
633+
uploadJitter)
630634
: injectRocksDBStateUploader;
631635
if (enableIncrementalCheckpointing) {
632636
checkpointSnapshotStrategy =

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.configuration.description.Description;
2828
import org.apache.flink.configuration.description.TextElement;
2929

30+
import java.time.Duration;
31+
3032
import static org.apache.flink.configuration.ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE;
3133
import static org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB;
3234
import static org.apache.flink.state.rocksdb.PredefinedOptions.DEFAULT;
@@ -91,6 +93,15 @@ public class RocksDBOptions {
9193
+ CLUSTER_IO_EXECUTOR_POOL_SIZE.key()
9294
+ ")");
9395

96+
/** The number of millisecond for RocksDBStateBackend checkpoint file upload jitter. */
97+
@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
98+
public static final ConfigOption<Duration> CHECKPOINT_UPLOAD_JITTER =
99+
ConfigOptions.key("state.backend.rocksdb.checkpoint.upload-jitter")
100+
.durationType()
101+
.defaultValue(Duration.ZERO)
102+
.withDescription(
103+
"The time interval that will be used to create jitter for each checkpoint file upload.");
104+
94105
/** The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. */
95106
@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
96107
public static final ConfigOption<String> PREDEFINED_OPTIONS =

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,32 @@
3939
import java.io.InputStream;
4040
import java.nio.file.Files;
4141
import java.nio.file.Path;
42+
import java.time.Duration;
4243
import java.util.ArrayList;
4344
import java.util.List;
45+
import java.util.Random;
4446
import java.util.concurrent.CompletableFuture;
4547
import java.util.concurrent.ExecutionException;
4648
import java.util.stream.Collectors;
4749

4850
/** Help class for uploading RocksDB state files. */
4951
public class RocksDBStateUploader implements Closeable {
5052
private static final int READ_BUFFER_SIZE = 16 * 1024;
51-
53+
private final Duration uploadJitter;
54+
private final Random random;
5255
private final RocksDBStateDataTransferHelper transfer;
5356

5457
@VisibleForTesting
55-
public RocksDBStateUploader(int numberOfSnapshottingThreads) {
56-
this(RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads));
58+
public RocksDBStateUploader(int numberOfSnapshottingThreads, Duration uploadJitter) {
59+
this(
60+
RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads),
61+
uploadJitter);
5762
}
5863

59-
public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer) {
64+
public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer, Duration uploadJitter) {
6065
this.transfer = transfer;
66+
this.uploadJitter = uploadJitter;
67+
this.random = new Random(uploadJitter.toMillis());
6168
}
6269

6370
/**
@@ -133,12 +140,14 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs(
133140
CheckpointedStateScope stateScope,
134141
CloseableRegistry closeableRegistry,
135142
CloseableRegistry tmpResourcesRegistry)
136-
throws IOException {
143+
throws IOException, InterruptedException {
137144

138145
InputStream inputStream = null;
139146
CheckpointStateOutputStream outputStream = null;
140147

141148
try {
149+
// add a random jitter
150+
applyJitter();
142151
final byte[] buffer = new byte[READ_BUFFER_SIZE];
143152

144153
inputStream = Files.newInputStream(filePath);
@@ -180,6 +189,17 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs(
180189
}
181190
}
182191

192+
@VisibleForTesting
193+
long applyJitter() throws InterruptedException {
194+
if (uploadJitter.isZero()) {
195+
return 0;
196+
}
197+
198+
long sleepMilliseconds = random.nextLong(0, uploadJitter.toMillis() + 1);
199+
Thread.sleep(sleepMilliseconds);
200+
return sleepMilliseconds;
201+
}
202+
183203
@Override
184204
public void close() throws IOException {
185205
this.transfer.close();

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ public void setupRocksKeyedStateBackend() throws Exception {
282282
rocksDBStateUploader =
283283
spy(
284284
new RocksDBStateUploader(
285-
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()));
285+
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue(),
286+
RocksDBOptions.CHECKPOINT_UPLOAD_JITTER.defaultValue()));
286287
keyedStateBackendBuilder.setRocksDBStateUploader(rocksDBStateUploader);
287288
}
288289

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateUploaderTest.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.io.IOException;
4343
import java.nio.file.Files;
4444
import java.nio.file.Path;
45+
import java.time.Duration;
4546
import java.util.ArrayList;
4647
import java.util.Collections;
4748
import java.util.List;
@@ -91,7 +92,8 @@ public List<StreamStateHandle> duplicate(
9192

9293
List<Path> filePaths = new ArrayList<>(1);
9394
filePaths.add(file.toPath());
94-
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
95+
try (RocksDBStateUploader rocksDBStateUploader =
96+
new RocksDBStateUploader(5, Duration.ZERO)) {
9597
assertThatThrownBy(
9698
() ->
9799
rocksDBStateUploader.uploadFilesToCheckpointFs(
@@ -136,7 +138,8 @@ void testUploadedSstCanBeCleanedUp() throws Exception {
136138
List<Path> filePaths =
137139
generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold);
138140
CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();
139-
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(1)) {
141+
try (RocksDBStateUploader rocksDBStateUploader =
142+
new RocksDBStateUploader(1, Duration.ZERO)) {
140143
rocksDBStateUploader.uploadFilesToCheckpointFs(
141144
filePaths,
142145
checkpointStreamFactory,
@@ -211,7 +214,8 @@ void testMultiThreadUploadCorrectly() throws Exception {
211214
List<Path> sstFilePaths =
212215
generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold);
213216

214-
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
217+
try (RocksDBStateUploader rocksDBStateUploader =
218+
new RocksDBStateUploader(5, Duration.ZERO)) {
215219
List<HandleAndLocalPath> sstFiles =
216220
rocksDBStateUploader.uploadFilesToCheckpointFs(
217221
sstFilePaths,
@@ -233,6 +237,26 @@ void testMultiThreadUploadCorrectly() throws Exception {
233237
}
234238
}
235239

240+
@Test
241+
void testApplyJitter() throws Exception {
242+
try (RocksDBStateUploader rocksDBStateUploader =
243+
new RocksDBStateUploader(1, Duration.ofMillis(1000L))) {
244+
long startTime = System.currentTimeMillis();
245+
long milliseconds = rocksDBStateUploader.applyJitter();
246+
assertThat(milliseconds).isLessThanOrEqualTo(1000);
247+
assertThat(System.currentTimeMillis() - startTime).isGreaterThanOrEqualTo(milliseconds);
248+
}
249+
}
250+
251+
@Test
252+
void testApplyDefaultJitter() throws Exception {
253+
try (RocksDBStateUploader rocksDBStateUploader =
254+
new RocksDBStateUploader(1, Duration.ZERO)) {
255+
long milliseconds = rocksDBStateUploader.applyJitter();
256+
assertThat(milliseconds).isEqualTo(0L);
257+
}
258+
}
259+
236260
private static CheckpointStateOutputStream createFailingCheckpointStateOutputStream(
237261
IOException failureException) {
238262
return new CheckpointStateOutputStream() {

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
114114

115115
RocksDBStateUploader rocksDBStateUploader =
116116
new RocksDBStateUploader(
117-
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue());
117+
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue(),
118+
RocksDBOptions.CHECKPOINT_UPLOAD_JITTER.defaultValue());
118119

119120
int keyGroupPrefixBytes =
120121
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2);

0 commit comments

Comments
 (0)