Skip to content

Commit ddaf26d

Browse files
committed
[FLINK-37900][rocksdb] support configurable jitter in rocksdb state uploader
1 parent 784805f commit ddaf26d

File tree

9 files changed

+118
-11
lines changed

9 files changed

+118
-11
lines changed

docs/layouts/shortcodes/generated/expert_rocksdb_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Integer</td>
1515
<td>The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.If negative, the common (TM) IO thread pool is used (see cluster.io-pool.size)</td>
1616
</tr>
17+
<tr>
18+
<td><h5>state.backend.rocksdb.checkpoint.upload-jitter</h5></td>
19+
<td style="word-wrap: break-word;">0 ms</td>
20+
<td>Duration</td>
21+
<td>The time interval used to create jitter for each checkpoint file upload.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>state.backend.rocksdb.localdir</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/rocksdb_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
<td>Integer</td>
1515
<td>The number of threads (per stateful operator) used to transfer (download and upload) files in RocksDBStateBackend.If negative, the common (TM) IO thread pool is used (see cluster.io-pool.size)</td>
1616
</tr>
17+
<tr>
18+
<td><h5>state.backend.rocksdb.checkpoint.upload-jitter</h5></td>
19+
<td style="word-wrap: break-word;">0 ms</td>
20+
<td>Duration</td>
21+
<td>The time interval used to create jitter for each checkpoint file upload.</td>
22+
</tr>
1723
<tr>
1824
<td><h5>state.backend.rocksdb.localdir</h5></td>
1925
<td style="word-wrap: break-word;">(none)</td>

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackend.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.io.IOException;
6565
import java.lang.reflect.Field;
6666
import java.net.URI;
67+
import java.time.Duration;
6768
import java.util.ArrayList;
6869
import java.util.Arrays;
6970
import java.util.List;
@@ -80,6 +81,7 @@
8081
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
8182
import static org.apache.flink.state.rocksdb.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
8283
import static org.apache.flink.state.rocksdb.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
84+
import static org.apache.flink.state.rocksdb.RocksDBOptions.CHECKPOINT_UPLOAD_JITTER;
8385
import static org.apache.flink.util.Preconditions.checkArgument;
8486
import static org.apache.flink.util.Preconditions.checkNotNull;
8587

@@ -140,6 +142,9 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke
140142
/** Thread number used to transfer (download and upload) state, default value: 1. */
141143
private int numberOfTransferThreads;
142144

145+
/** The max duration of checkpoint uploader jitter. */
146+
private Duration checkpointUploadJitter;
147+
143148
/** The configuration for memory settings (pool sizes, etc.). */
144149
private final RocksDBMemoryConfiguration memoryConfiguration;
145150

@@ -233,6 +238,7 @@ public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing
233238
this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
234239
this.incrementalRestoreAsyncCompactAfterRescale = TernaryBoolean.UNDEFINED;
235240
this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
241+
this.checkpointUploadJitter = CHECKPOINT_UPLOAD_JITTER.defaultValue();
236242
this.manualCompactionConfig = null;
237243
}
238244

@@ -267,6 +273,8 @@ protected EmbeddedRocksDBStateBackend(
267273
original.memoryConfiguration, config);
268274
this.memoryConfiguration.validate();
269275

276+
this.checkpointUploadJitter = config.get(CHECKPOINT_UPLOAD_JITTER);
277+
270278
this.priorityQueueConfig =
271279
RocksDBPriorityQueueConfig.fromOtherAndConfiguration(
272280
original.priorityQueueConfig, config);
@@ -522,6 +530,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
522530
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
523531
.setNativeMetricOptions(
524532
resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
533+
.setCheckpointUploadJitter(checkpointUploadJitter)
525534
.setWriteBatchSize(getWriteBatchSize())
526535
.setOverlapFractionThreshold(getOverlapFractionThreshold())
527536
.setIncrementalRestoreAsyncCompactAfterRescale(

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackendBuilder.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272

7373
import java.io.File;
7474
import java.io.IOException;
75+
import java.time.Duration;
7576
import java.util.ArrayList;
7677
import java.util.Collection;
7778
import java.util.LinkedHashMap;
@@ -132,6 +133,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
132133
private RocksDBNativeMetricOptions nativeMetricOptions;
133134

134135
private int numberOfTransferingThreads;
136+
private Duration uploadJitter;
135137
private long writeBatchSize =
136138
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
137139

@@ -200,6 +202,7 @@ public RocksDBKeyedStateBackendBuilder(
200202
this.nativeMetricOptions = new RocksDBNativeMetricOptions();
201203
this.numberOfTransferingThreads =
202204
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue();
205+
this.uploadJitter = RocksDBOptions.CHECKPOINT_UPLOAD_JITTER.defaultValue();
203206
}
204207

205208
@VisibleForTesting
@@ -271,6 +274,12 @@ RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(
271274
return this;
272275
}
273276

277+
RocksDBKeyedStateBackendBuilder<K> setCheckpointUploadJitter(Duration uploadJitter) {
278+
Preconditions.checkState(uploadJitter == null, "uploadJitter should not be null.");
279+
this.uploadJitter = uploadJitter;
280+
return this;
281+
}
282+
274283
RocksDBKeyedStateBackendBuilder<K> setWriteBatchSize(long writeBatchSize) {
275284
checkArgument(writeBatchSize >= 0, "Write batch size should be non negative.");
276285
this.writeBatchSize = writeBatchSize;
@@ -626,7 +635,8 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
626635
injectRocksDBStateUploader == null
627636
? new RocksDBStateUploader(
628637
RocksDBStateDataTransferHelper.forThreadNumIfSpecified(
629-
numberOfTransferingThreads, ioExecutor))
638+
numberOfTransferingThreads, ioExecutor),
639+
uploadJitter)
630640
: injectRocksDBStateUploader;
631641
if (enableIncrementalCheckpointing) {
632642
checkpointSnapshotStrategy =

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.configuration.description.Description;
2828
import org.apache.flink.configuration.description.TextElement;
2929

30+
import java.time.Duration;
31+
3032
import static org.apache.flink.configuration.ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE;
3133
import static org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB;
3234
import static org.apache.flink.state.rocksdb.PredefinedOptions.DEFAULT;
@@ -91,6 +93,15 @@ public class RocksDBOptions {
9193
+ CLUSTER_IO_EXECUTOR_POOL_SIZE.key()
9294
+ ")");
9395

96+
/** The time interval used to create jitter for each checkpoint file upload. */
97+
@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
98+
public static final ConfigOption<Duration> CHECKPOINT_UPLOAD_JITTER =
99+
ConfigOptions.key("state.backend.rocksdb.checkpoint.upload-jitter")
100+
.durationType()
101+
.defaultValue(Duration.ZERO)
102+
.withDescription(
103+
"The time interval used to create jitter for each checkpoint file upload.");
104+
94105
/** The predefined settings for RocksDB DBOptions and ColumnFamilyOptions by Flink community. */
95106
@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
96107
public static final ConfigOption<String> PREDEFINED_OPTIONS =

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBStateUploader.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,32 +32,44 @@
3232
import org.apache.flink.util.concurrent.FutureUtils;
3333
import org.apache.flink.util.function.CheckedSupplier;
3434

35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
3538
import javax.annotation.Nonnull;
3639

3740
import java.io.Closeable;
3841
import java.io.IOException;
3942
import java.io.InputStream;
4043
import java.nio.file.Files;
4144
import java.nio.file.Path;
45+
import java.time.Duration;
4246
import java.util.ArrayList;
4347
import java.util.List;
48+
import java.util.Random;
4449
import java.util.concurrent.CompletableFuture;
4550
import java.util.concurrent.ExecutionException;
51+
import java.util.function.Consumer;
4652
import java.util.stream.Collectors;
4753

4854
/** Help class for uploading RocksDB state files. */
4955
public class RocksDBStateUploader implements Closeable {
56+
private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateUploader.class);
5057
private static final int READ_BUFFER_SIZE = 16 * 1024;
51-
58+
private final Duration uploadJitter;
59+
private final Random random;
5260
private final RocksDBStateDataTransferHelper transfer;
5361

5462
@VisibleForTesting
55-
public RocksDBStateUploader(int numberOfSnapshottingThreads) {
56-
this(RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads));
63+
public RocksDBStateUploader(int numberOfSnapshottingThreads, Duration uploadJitter) {
64+
this(
65+
RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads),
66+
uploadJitter);
5767
}
5868

59-
public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer) {
69+
public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer, Duration uploadJitter) {
6070
this.transfer = transfer;
71+
this.uploadJitter = uploadJitter;
72+
this.random = new Random();
6173
}
6274

6375
/**
@@ -133,12 +145,14 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs(
133145
CheckpointedStateScope stateScope,
134146
CloseableRegistry closeableRegistry,
135147
CloseableRegistry tmpResourcesRegistry)
136-
throws IOException {
148+
throws IOException, InterruptedException {
137149

138150
InputStream inputStream = null;
139151
CheckpointStateOutputStream outputStream = null;
140152

141153
try {
154+
// add a random jitter
155+
applyJitter(new JitterConsumer());
142156
final byte[] buffer = new byte[READ_BUFFER_SIZE];
143157

144158
inputStream = Files.newInputStream(filePath);
@@ -180,6 +194,28 @@ private HandleAndLocalPath uploadLocalFileToCheckpointFs(
180194
}
181195
}
182196

197+
@VisibleForTesting
198+
void applyJitter(Consumer<Long> consumer) {
199+
if (uploadJitter.isZero()) {
200+
return;
201+
}
202+
203+
long milliseconds = random.nextLong(0, uploadJitter.toMillis() + 1);
204+
consumer.accept(milliseconds);
205+
}
206+
207+
static class JitterConsumer implements Consumer<Long> {
208+
209+
@Override
210+
public void accept(Long milliseconds) {
211+
try {
212+
Thread.sleep(milliseconds);
213+
} catch (InterruptedException e) {
214+
LOG.error("Fail to apply jitter in RocksDBStateUploader.", e);
215+
}
216+
}
217+
}
218+
183219
@Override
184220
public void close() throws IOException {
185221
this.transfer.close();

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,8 @@ public void setupRocksKeyedStateBackend() throws Exception {
282282
rocksDBStateUploader =
283283
spy(
284284
new RocksDBStateUploader(
285-
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()));
285+
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue(),
286+
RocksDBOptions.CHECKPOINT_UPLOAD_JITTER.defaultValue()));
286287
keyedStateBackendBuilder.setRocksDBStateUploader(rocksDBStateUploader);
287288
}
288289

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateUploaderTest.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.io.IOException;
4343
import java.nio.file.Files;
4444
import java.nio.file.Path;
45+
import java.time.Duration;
4546
import java.util.ArrayList;
4647
import java.util.Collections;
4748
import java.util.List;
@@ -51,6 +52,7 @@
5152

5253
import static org.assertj.core.api.Assertions.assertThat;
5354
import static org.assertj.core.api.Assertions.assertThatThrownBy;
55+
import static org.assertj.core.api.Assertions.fail;
5456

5557
/** Test class for {@link RocksDBStateUploader}. */
5658
public class RocksDBStateUploaderTest extends TestLogger {
@@ -91,7 +93,8 @@ public List<StreamStateHandle> duplicate(
9193

9294
List<Path> filePaths = new ArrayList<>(1);
9395
filePaths.add(file.toPath());
94-
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
96+
try (RocksDBStateUploader rocksDBStateUploader =
97+
new RocksDBStateUploader(5, Duration.ZERO)) {
9598
assertThatThrownBy(
9699
() ->
97100
rocksDBStateUploader.uploadFilesToCheckpointFs(
@@ -136,7 +139,8 @@ void testUploadedSstCanBeCleanedUp() throws Exception {
136139
List<Path> filePaths =
137140
generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold);
138141
CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();
139-
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(1)) {
142+
try (RocksDBStateUploader rocksDBStateUploader =
143+
new RocksDBStateUploader(1, Duration.ZERO)) {
140144
rocksDBStateUploader.uploadFilesToCheckpointFs(
141145
filePaths,
142146
checkpointStreamFactory,
@@ -211,7 +215,8 @@ void testMultiThreadUploadCorrectly() throws Exception {
211215
List<Path> sstFilePaths =
212216
generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold);
213217

214-
try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) {
218+
try (RocksDBStateUploader rocksDBStateUploader =
219+
new RocksDBStateUploader(5, Duration.ZERO)) {
215220
List<HandleAndLocalPath> sstFiles =
216221
rocksDBStateUploader.uploadFilesToCheckpointFs(
217222
sstFilePaths,
@@ -233,6 +238,28 @@ void testMultiThreadUploadCorrectly() throws Exception {
233238
}
234239
}
235240

241+
@Test
242+
void testApplyJitter() throws Exception {
243+
try (RocksDBStateUploader rocksDBStateUploader =
244+
new RocksDBStateUploader(1, Duration.ofMillis(1000L))) {
245+
rocksDBStateUploader.applyJitter(
246+
sleepTime -> {
247+
assertThat(sleepTime).isGreaterThan(0);
248+
assertThat(sleepTime).isLessThanOrEqualTo(1000);
249+
});
250+
}
251+
}
252+
253+
@Test
254+
void testApplyDefaultJitter() throws Exception {
255+
try (RocksDBStateUploader rocksDBStateUploader =
256+
new RocksDBStateUploader(1, Duration.ZERO)) {
257+
258+
// If it is configured as Zero, the consumer function shouldn't be called
259+
rocksDBStateUploader.applyJitter(sleepTime -> fail());
260+
}
261+
}
262+
236263
private static CheckpointStateOutputStream createFailingCheckpointStateOutputStream(
237264
IOException failureException) {
238265
return new CheckpointStateOutputStream() {

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
114114

115115
RocksDBStateUploader rocksDBStateUploader =
116116
new RocksDBStateUploader(
117-
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue());
117+
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue(),
118+
RocksDBOptions.CHECKPOINT_UPLOAD_JITTER.defaultValue());
118119

119120
int keyGroupPrefixBytes =
120121
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2);

0 commit comments

Comments
 (0)