From cc2af74259ce7d15b1857a640a47425ee8687b39 Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Wed, 10 Jun 2026 14:03:13 +0200 Subject: [PATCH 1/7] Added state.backend.forst.checkpoint.transfer.thread.num config option for setting a transfer thread count --- .../flink/state/forst/ForStKeyedStateBackendBuilder.java | 2 +- .../java/org/apache/flink/state/forst/ForStOptions.java | 8 ++++++++ .../apache/flink/state/forst/ForStResourceContainer.java | 4 ++++ .../state/forst/datatransfer/ForStStateDataTransfer.java | 3 --- .../forst/restore/ForStIncrementalRestoreOperation.java | 2 +- .../forst/sync/ForStSyncKeyedStateBackendBuilder.java | 2 +- .../snapshot/ForStIncrementalSnapshotStrategyTest.java | 4 ++-- 7 files changed, 17 insertions(+), 8 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index d9b2bc3dd58c4..68aa391871275 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -442,7 +442,7 @@ private ForStRestoreOperation getForStRestoreOperation( long lastCompletedCheckpointId) { ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer( - ForStStateDataTransfer.DEFAULT_THREAD_NUM, + optionsContainer.getDataTransferThreadNum(), optionsContainer.getFileSystem()); if (enableIncrementalCheckpointing) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java index 8a816e3e1b13e..b11fcd4d49276 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java @@ -300,4 +300,12 @@ public class ForStOptions { + " Only valid when '" + EXECUTOR_WRITE_IO_INLINE.key() + "' is false."); + + @Documentation.Section(Documentation.Sections.EXPERT_FORST) + public static final ConfigOption CHECKPOINT_TRANSFER_THREAD_NUM = + ConfigOptions.key("state.backend.forst.checkpoint.transfer.thread.num") + .intType() + .defaultValue(4) + .withDescription( + "The number of transfer threads used to write or copy files to the state backend."); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index cb29cf1857cb1..6d7b225af2372 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -324,6 +324,10 @@ public int getWriteIoParallelism() { return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM); } + public int getDataTransferThreadNum() { + return configuration.get(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM); + } + /** * Prepare local and remote directories. * diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java index 0305f8cd02e9a..852fa08405e4b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java @@ -64,9 +64,6 @@ public class ForStStateDataTransfer implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(ForStStateDataTransfer.class); - // TODO: Add ConfigOption replace this field after ForSt checkpoint implementation stable - public static final int DEFAULT_THREAD_NUM = 4; - protected final ExecutorService executorService; @Nullable private final ForStFlinkFileSystem forStFs; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java index f205fe21b9f0d..ca3d683a48410 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java @@ -265,7 +265,7 @@ public ForStRestoreResult restore() throws Exception { private void transferAllStateHandles(List specs) throws Exception { try (ForStStateDataTransfer transfer = new ForStStateDataTransfer( - ForStStateDataTransfer.DEFAULT_THREAD_NUM, + optionsContainer.getDataTransferThreadNum(), optionsContainer.getFileSystem())) { transfer.transferAllStateDataToDirectory( optionsContainer.getPathContainer(), diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java index 0e83b22a0ced2..0f6dcac1e7367 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java @@ -465,7 +465,7 @@ public ForStSyncKeyedStateBackendBuilder setRecoveryClaimMode( ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer( - ForStStateDataTransfer.DEFAULT_THREAD_NUM, + optionsContainer.getDataTransferThreadNum(), optionsContainer.getFileSystem()); if (enableIncrementalCheckpointing) { diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java index 034df702554ac..42bca08e4fff8 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java @@ -199,7 +199,7 @@ private ForStIncrementalSnapshotStrategy createSnapshotStrategy() CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), new TreeMap<>(), - new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM), + new ForStStateDataTransfer(5), -1); } @@ -231,7 +231,7 @@ private ForStNativeFullSnapshotStrategy createFullSnapshotStrategy() new KeyGroupRange(0, 1), CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), - new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM)); + new ForStStateDataTransfer(5)); } private FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws IOException { From abee959dad43c7958b57ef078ee711ec03a7a5ed Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Wed, 10 Jun 2026 14:07:52 +0200 Subject: [PATCH 2/7] Using 4 as a default in tests --- .../forst/snapshot/ForStIncrementalSnapshotStrategyTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java index 42bca08e4fff8..ae288fbff1ca7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java @@ -199,7 +199,7 @@ private ForStIncrementalSnapshotStrategy createSnapshotStrategy() CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), new TreeMap<>(), - new ForStStateDataTransfer(5), + new ForStStateDataTransfer(4), -1); } @@ -231,7 +231,7 @@ private ForStNativeFullSnapshotStrategy createFullSnapshotStrategy() new KeyGroupRange(0, 1), CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), - new ForStStateDataTransfer(5)); + new ForStStateDataTransfer(4)); } private FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws IOException { From 55ad2ef2d7c7dcd9363b41d9e1f3433055531dc0 Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Wed, 10 Jun 2026 14:17:38 +0200 Subject: [PATCH 3/7] Added simple test --- .../forst/ForStStateBackendConfigTest.java | 119 +++++++++++------- 1 file changed, 72 insertions(+), 47 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java index 624bbf0b577db..bf36a09412ff3 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java @@ -82,10 +82,13 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -/** Tests for configuring the ForSt State Backend. */ +/** + * Tests for configuring the ForSt State Backend. + */ public class ForStStateBackendConfigTest { - @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); // ------------------------------------------------------------------------ // default values @@ -98,7 +101,7 @@ public void testDefaultDbLogDir() throws Exception { // set the environment variable 'log.file' with the Flink log file location System.setProperty("log.file", logFile.getPath()); try (ForStResourceContainer container = - backend.createOptionsAndResourceContainer(new Path(tempFolder.toString()))) { + backend.createOptionsAndResourceContainer(new Path(tempFolder.toString()))) { assertEquals( ForStConfigurableOptions.LOG_LEVEL.defaultValue(), container.getDbOptions().infoLogLevel()); @@ -113,8 +116,8 @@ public void testDefaultDbLogDir() throws Exception { longInstanceBasePath.append("/append-for-long-path"); } try (ForStResourceContainer container = - backend.createOptionsAndResourceContainer( - new Path(longInstanceBasePath.toString()))) { + backend.createOptionsAndResourceContainer( + new Path(longInstanceBasePath.toString()))) { assertTrue(container.getDbOptions().dbLogDir().isEmpty()); } finally { logFile.delete(); @@ -125,7 +128,9 @@ public void testDefaultDbLogDir() throws Exception { // ForSt local file directory // ------------------------------------------------------------------------ - /** This test checks the behavior for basic setting of local DB directories. */ + /** + * This test checks the behavior for basic setting of local DB directories. + */ @Test public void testSetDbPath() throws Exception { final ForStStateBackend forStStateBackend = new ForStStateBackend(); @@ -136,14 +141,14 @@ public void testSetDbPath() throws Exception { assertNull(forStStateBackend.getLocalDbStoragePaths()); forStStateBackend.setLocalDbStoragePath(testDir1); - assertArrayEquals(new String[] {testDir1}, forStStateBackend.getLocalDbStoragePaths()); + assertArrayEquals(new String[]{testDir1}, forStStateBackend.getLocalDbStoragePaths()); forStStateBackend.setLocalDbStoragePath(null); assertNull(forStStateBackend.getLocalDbStoragePaths()); forStStateBackend.setLocalDbStoragePaths(testDir1, testDir2); assertArrayEquals( - new String[] {testDir1, testDir2}, forStStateBackend.getLocalDbStoragePaths()); + new String[]{testDir1, testDir2}, forStStateBackend.getLocalDbStoragePaths()); final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); final ForStKeyedStateBackend keyedBackend = @@ -169,7 +174,7 @@ public void testConfigureForStCompressionPerLevel() throws Exception { final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); ForStStateBackend forStStateBackend = new ForStStateBackend(); CompressionType[] compressionTypes = { - CompressionType.NO_COMPRESSION, CompressionType.SNAPPY_COMPRESSION + CompressionType.NO_COMPRESSION, CompressionType.SNAPPY_COMPRESSION }; Configuration conf = new Configuration(); conf.set( @@ -251,14 +256,18 @@ private void testLocalDbPaths(String configuredPath, File expectedPath) throws E } } - /** Validates that empty arguments for the local DB path are invalid. */ + /** + * Validates that empty arguments for the local DB path are invalid. + */ @Test(expected = IllegalArgumentException.class) public void testSetEmptyPaths() throws Exception { ForStStateBackend forStStateBackend = new ForStStateBackend(); forStStateBackend.setLocalDbStoragePaths(); } - /** Validates that schemes other than 'file:/' are not allowed. */ + /** + * Validates that schemes other than 'file:/' are not allowed. + */ @Test(expected = IllegalArgumentException.class) public void testNonFileSchemePath() throws Exception { ForStStateBackend forStStateBackend = new ForStStateBackend(); @@ -526,15 +535,15 @@ public void testConfigurableOptionsFromConfig() throws Exception { ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1h"); try (ForStResourceContainer optionsContainer = - new ForStResourceContainer( - configuration, - null, - null, - ForStPathContainer.empty(), - null, - null, - null, - false)) { + new ForStResourceContainer( + configuration, + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(-1, dbOptions.maxOpenFiles()); @@ -583,7 +592,7 @@ public void testOptionsFactory() throws Exception { assertTrue(forStStateBackend.getForStOptions() instanceof TestOptionsFactory); try (ForStResourceContainer optionsContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(4, dbOptions.maxBackgroundJobs()); } @@ -607,7 +616,7 @@ public ColumnFamilyOptions createColumnOptions( }); try (ForStResourceContainer optionsContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { ColumnFamilyOptions colCreated = optionsContainer.getColumnOptions(); assertEquals(CompactionStyle.FIFO, colCreated.compactionStyle()); } @@ -618,15 +627,15 @@ public void testConfigurableOptions() throws Exception { Configuration configuration = new Configuration(); configuration.set(ForStConfigurableOptions.COMPACTION_STYLE, CompactionStyle.UNIVERSAL); try (final ForStResourceContainer optionsContainer = - new ForStResourceContainer( - configuration, - null, - null, - ForStPathContainer.empty(), - null, - null, - null, - false)) { + new ForStResourceContainer( + configuration, + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -634,15 +643,15 @@ public void testConfigurableOptions() throws Exception { } try (final ForStResourceContainer optionsContainer = - new ForStResourceContainer( - new Configuration(), - null, - null, - ForStPathContainer.empty(), - null, - null, - null, - false)) { + new ForStResourceContainer( + new Configuration(), + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -669,7 +678,7 @@ public ColumnFamilyOptions createColumnOptions( }; try (final ForStResourceContainer optionsContainer = - new ForStResourceContainer(optionsFactory)) { + new ForStResourceContainer(optionsFactory)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -688,9 +697,9 @@ public void testForStReconfigurationCopiesExistingValues() throws Exception { original.setForStOptions(optionsFactory); final String[] localDirs = - new String[] { - tempFolder.newFolder().getAbsolutePath(), - tempFolder.newFolder().getAbsolutePath() + new String[]{ + tempFolder.newFolder().getAbsolutePath(), + tempFolder.newFolder().getAbsolutePath() }; original.setLocalDbStoragePaths(localDirs); @@ -813,7 +822,7 @@ public void testConfigurePeriodicCompactionTime() throws Exception { ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1d"); forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader()); try (ForStResourceContainer resourceContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { assertEquals(Duration.ofDays(1), resourceContainer.getPeriodicCompactionTime()); } } @@ -826,11 +835,25 @@ public void testConfigureQueryTimeAfterNumEntries() throws Exception { ForStConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES.key(), "100"); forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader()); try (ForStResourceContainer resourceContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { assertEquals(100L, resourceContainer.getQueryTimeAfterNumEntries().longValue()); } } + @Test + public void testConfigureCheckpointTransferThreadNumber() throws Exception { + ForStStateBackend forStStateBackend = new ForStStateBackend(); + Configuration configuration = new Configuration(); + configuration.setString( + ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.key(), "10"); + forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader()); + + try (ForStResourceContainer resourceContainer = + forStStateBackend.createOptionsAndResourceContainer(null)) { + assertEquals(10, resourceContainer.getDataTransferThreadNum()); + } + } + private void verifySetParameter(Runnable setter) { try { setter.run(); @@ -875,7 +898,9 @@ private void verifyIllegalArgument(ConfigOption configOption, String configVa } } - /** An implementation of options factory for testing. */ + /** + * An implementation of options factory for testing. + */ public static class TestOptionsFactory implements ConfigurableForStOptionsFactory { public static final ConfigOption BACKGROUND_JOBS_OPTION = ConfigOptions.key("my.custom.forst.backgroundJobs").intType().defaultValue(2); From a6c99b471a3029cdc7115f3736a3fd355edab9c3 Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Wed, 10 Jun 2026 14:21:27 +0200 Subject: [PATCH 4/7] Removed formatting changes --- .../forst/ForStStateBackendConfigTest.java | 105 ++++++++---------- 1 file changed, 47 insertions(+), 58 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java index bf36a09412ff3..2a5eb8c9dcf9c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java @@ -82,13 +82,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -/** - * Tests for configuring the ForSt State Backend. - */ +/** Tests for configuring the ForSt State Backend. */ public class ForStStateBackendConfigTest { - @Rule - public final TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); // ------------------------------------------------------------------------ // default values @@ -101,7 +98,7 @@ public void testDefaultDbLogDir() throws Exception { // set the environment variable 'log.file' with the Flink log file location System.setProperty("log.file", logFile.getPath()); try (ForStResourceContainer container = - backend.createOptionsAndResourceContainer(new Path(tempFolder.toString()))) { + backend.createOptionsAndResourceContainer(new Path(tempFolder.toString()))) { assertEquals( ForStConfigurableOptions.LOG_LEVEL.defaultValue(), container.getDbOptions().infoLogLevel()); @@ -116,8 +113,8 @@ public void testDefaultDbLogDir() throws Exception { longInstanceBasePath.append("/append-for-long-path"); } try (ForStResourceContainer container = - backend.createOptionsAndResourceContainer( - new Path(longInstanceBasePath.toString()))) { + backend.createOptionsAndResourceContainer( + new Path(longInstanceBasePath.toString()))) { assertTrue(container.getDbOptions().dbLogDir().isEmpty()); } finally { logFile.delete(); @@ -128,9 +125,7 @@ public void testDefaultDbLogDir() throws Exception { // ForSt local file directory // ------------------------------------------------------------------------ - /** - * This test checks the behavior for basic setting of local DB directories. - */ + /** This test checks the behavior for basic setting of local DB directories. */ @Test public void testSetDbPath() throws Exception { final ForStStateBackend forStStateBackend = new ForStStateBackend(); @@ -141,14 +136,14 @@ public void testSetDbPath() throws Exception { assertNull(forStStateBackend.getLocalDbStoragePaths()); forStStateBackend.setLocalDbStoragePath(testDir1); - assertArrayEquals(new String[]{testDir1}, forStStateBackend.getLocalDbStoragePaths()); + assertArrayEquals(new String[] {testDir1}, forStStateBackend.getLocalDbStoragePaths()); forStStateBackend.setLocalDbStoragePath(null); assertNull(forStStateBackend.getLocalDbStoragePaths()); forStStateBackend.setLocalDbStoragePaths(testDir1, testDir2); assertArrayEquals( - new String[]{testDir1, testDir2}, forStStateBackend.getLocalDbStoragePaths()); + new String[] {testDir1, testDir2}, forStStateBackend.getLocalDbStoragePaths()); final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); final ForStKeyedStateBackend keyedBackend = @@ -174,7 +169,7 @@ public void testConfigureForStCompressionPerLevel() throws Exception { final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); ForStStateBackend forStStateBackend = new ForStStateBackend(); CompressionType[] compressionTypes = { - CompressionType.NO_COMPRESSION, CompressionType.SNAPPY_COMPRESSION + CompressionType.NO_COMPRESSION, CompressionType.SNAPPY_COMPRESSION }; Configuration conf = new Configuration(); conf.set( @@ -256,18 +251,14 @@ private void testLocalDbPaths(String configuredPath, File expectedPath) throws E } } - /** - * Validates that empty arguments for the local DB path are invalid. - */ + /** Validates that empty arguments for the local DB path are invalid. */ @Test(expected = IllegalArgumentException.class) public void testSetEmptyPaths() throws Exception { ForStStateBackend forStStateBackend = new ForStStateBackend(); forStStateBackend.setLocalDbStoragePaths(); } - /** - * Validates that schemes other than 'file:/' are not allowed. - */ + /** Validates that schemes other than 'file:/' are not allowed. */ @Test(expected = IllegalArgumentException.class) public void testNonFileSchemePath() throws Exception { ForStStateBackend forStStateBackend = new ForStStateBackend(); @@ -535,15 +526,15 @@ public void testConfigurableOptionsFromConfig() throws Exception { ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1h"); try (ForStResourceContainer optionsContainer = - new ForStResourceContainer( - configuration, - null, - null, - ForStPathContainer.empty(), - null, - null, - null, - false)) { + new ForStResourceContainer( + configuration, + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(-1, dbOptions.maxOpenFiles()); @@ -592,7 +583,7 @@ public void testOptionsFactory() throws Exception { assertTrue(forStStateBackend.getForStOptions() instanceof TestOptionsFactory); try (ForStResourceContainer optionsContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(4, dbOptions.maxBackgroundJobs()); } @@ -616,7 +607,7 @@ public ColumnFamilyOptions createColumnOptions( }); try (ForStResourceContainer optionsContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { ColumnFamilyOptions colCreated = optionsContainer.getColumnOptions(); assertEquals(CompactionStyle.FIFO, colCreated.compactionStyle()); } @@ -627,15 +618,15 @@ public void testConfigurableOptions() throws Exception { Configuration configuration = new Configuration(); configuration.set(ForStConfigurableOptions.COMPACTION_STYLE, CompactionStyle.UNIVERSAL); try (final ForStResourceContainer optionsContainer = - new ForStResourceContainer( - configuration, - null, - null, - ForStPathContainer.empty(), - null, - null, - null, - false)) { + new ForStResourceContainer( + configuration, + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -643,15 +634,15 @@ public void testConfigurableOptions() throws Exception { } try (final ForStResourceContainer optionsContainer = - new ForStResourceContainer( - new Configuration(), - null, - null, - ForStPathContainer.empty(), - null, - null, - null, - false)) { + new ForStResourceContainer( + new Configuration(), + null, + null, + ForStPathContainer.empty(), + null, + null, + null, + false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -678,7 +669,7 @@ public ColumnFamilyOptions createColumnOptions( }; try (final ForStResourceContainer optionsContainer = - new ForStResourceContainer(optionsFactory)) { + new ForStResourceContainer(optionsFactory)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); assertNotNull(columnFamilyOptions); @@ -697,9 +688,9 @@ public void testForStReconfigurationCopiesExistingValues() throws Exception { original.setForStOptions(optionsFactory); final String[] localDirs = - new String[]{ - tempFolder.newFolder().getAbsolutePath(), - tempFolder.newFolder().getAbsolutePath() + new String[] { + tempFolder.newFolder().getAbsolutePath(), + tempFolder.newFolder().getAbsolutePath() }; original.setLocalDbStoragePaths(localDirs); @@ -822,7 +813,7 @@ public void testConfigurePeriodicCompactionTime() throws Exception { ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1d"); forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader()); try (ForStResourceContainer resourceContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { assertEquals(Duration.ofDays(1), resourceContainer.getPeriodicCompactionTime()); } } @@ -835,7 +826,7 @@ public void testConfigureQueryTimeAfterNumEntries() throws Exception { ForStConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES.key(), "100"); forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader()); try (ForStResourceContainer resourceContainer = - forStStateBackend.createOptionsAndResourceContainer(null)) { + forStStateBackend.createOptionsAndResourceContainer(null)) { assertEquals(100L, resourceContainer.getQueryTimeAfterNumEntries().longValue()); } } @@ -898,9 +889,7 @@ private void verifyIllegalArgument(ConfigOption configOption, String configVa } } - /** - * An implementation of options factory for testing. - */ + /** An implementation of options factory for testing. */ public static class TestOptionsFactory implements ConfigurableForStOptionsFactory { public static final ConfigOption BACKGROUND_JOBS_OPTION = ConfigOptions.key("my.custom.forst.backgroundJobs").intType().defaultValue(2); From 963820166c9bb9ab5ea48059de27f041d9d7a7bc Mon Sep 17 00:00:00 2001 From: Francis Date: Wed, 10 Jun 2026 16:02:47 +0200 Subject: [PATCH 5/7] Update flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java Co-authored-by: Purushottam Sinha --- .../forst/snapshot/ForStIncrementalSnapshotStrategyTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java index ae288fbff1ca7..b180ba68e57b4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java @@ -199,7 +199,7 @@ private ForStIncrementalSnapshotStrategy createSnapshotStrategy() CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), new TreeMap<>(), - new ForStStateDataTransfer(4), + new ForStStateDataTransfer(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()), -1); } From d440622dd5963121b49465214e11cf0f3251c3a8 Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Wed, 10 Jun 2026 16:04:06 +0200 Subject: [PATCH 6/7] Using config default --- .../forst/snapshot/ForStIncrementalSnapshotStrategyTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java index b180ba68e57b4..e0e738b4540fb 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.state.forst.ForStExtension; import org.apache.flink.state.forst.ForStOperationUtils; +import org.apache.flink.state.forst.ForStOptions; import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer; import org.apache.flink.testutils.junit.utils.TempDirUtils; @@ -231,7 +232,7 @@ private ForStNativeFullSnapshotStrategy createFullSnapshotStrategy() new KeyGroupRange(0, 1), CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), - new ForStStateDataTransfer(4)); + new ForStStateDataTransfer(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue())); } private FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws IOException { From 20aa253408c01de7bbb8ee12f25fe0ebb07b973c Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Wed, 10 Jun 2026 16:12:17 +0200 Subject: [PATCH 7/7] Adding generated documentation --- docs/layouts/shortcodes/generated/expert_forst_section.html | 6 ++++++ docs/layouts/shortcodes/generated/forst_configuration.html | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/docs/layouts/shortcodes/generated/expert_forst_section.html b/docs/layouts/shortcodes/generated/expert_forst_section.html index 81ec199945840..f9b7acec173b0 100644 --- a/docs/layouts/shortcodes/generated/expert_forst_section.html +++ b/docs/layouts/shortcodes/generated/expert_forst_section.html @@ -20,6 +20,12 @@ Integer When the number of eviction that a block in hot link is moved to cold link reaches this value, the block will be blocked from being promoted to the head of the LRU list. The default value is '3'. + +
state.backend.forst.checkpoint.transfer.thread.num
+ 4 + Integer + The number of transfer threads used to write or copy files to the state backend. +
state.backend.forst.executor.inline-coordinator
false diff --git a/docs/layouts/shortcodes/generated/forst_configuration.html b/docs/layouts/shortcodes/generated/forst_configuration.html index f66c1af4d8b3b..c1ce466a793a4 100644 --- a/docs/layouts/shortcodes/generated/forst_configuration.html +++ b/docs/layouts/shortcodes/generated/forst_configuration.html @@ -38,6 +38,12 @@ MemorySize An upper-bound of the size that can be used for cache. User should specify at least one cache size limit to enable the cache, either this option or the 'state.backend.forst.cache.reserve-size' option. They can be set simultaneously, and in this case, cache will grow if meet the requirements of both two options. The default value is '0 bytes', meaning that this option is disabled. + +
state.backend.forst.checkpoint.transfer.thread.num
+ 4 + Integer + The number of transfer threads used to write or copy files to the state backend. +
state.backend.forst.executor.inline-coordinator
false