Skip to content

Commit 3e79b2e

Browse files
authored
[client] Set the default value of 'scan.partition.discovery.interval' to 1min to reduce the server load (#1736)
1 parent 78ba99b commit 3e79b2e

File tree

5 files changed

+43
-20
lines changed

5 files changed

+43
-20
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,15 @@ public class FlinkConnectorOptions {
9595
public static final ConfigOption<Duration> SCAN_PARTITION_DISCOVERY_INTERVAL =
9696
ConfigOptions.key("scan.partition.discovery.interval")
9797
.durationType()
98-
.defaultValue(Duration.ofSeconds(10))
98+
.defaultValue(Duration.ofMinutes(1))
9999
.withDescription(
100100
"The time interval for the Fluss source to discover "
101101
+ "the new partitions for partitioned table while scanning."
102-
+ " A non-positive value disables the partition discovery.");
102+
+ " A non-positive value disables the partition discovery. The default value is 1 "
103+
+ "minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large "
104+
+ "number of requests to ZooKeeper in server, this option cannot be set too small, "
105+
+ "as a small value would cause frequent requests and increase server load. In the future, "
106+
+ "once list partitions is optimized, the default value of this parameter can be reduced.");
103107

104108
public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
105109
ConfigOptions.key("sink.ignore-delete")

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,8 +566,15 @@ void testWritePartitionedTable(boolean isPrimaryKeyTable, boolean isAutoPartitio
566566
tableName, String.join(", ", insertValues)))
567567
.await();
568568

569+
// This test requires dynamically discovering newly created partitions, so
570+
// 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute),
571+
// otherwise the test may hang for 1 minute.
569572
CloseableIterator<Row> rowIter =
570-
tEnv.executeSql(String.format("select * from %s", tableName)).collect();
573+
tEnv.executeSql(
574+
String.format(
575+
"select * from %s /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */",
576+
tableName))
577+
.collect();
571578
assertResultsIgnoreOrder(rowIter, expectedRows, false);
572579

573580
// create two partitions, write data to the new partitions

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,8 +607,15 @@ void testReadPrimaryKeyPartitionedTable(boolean isAutoPartition) throws Exceptio
607607
writeRowsToPartition(conn, tablePath, partitionNameById.values());
608608
waitUntilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values());
609609

610+
// This test requires dynamically discovering newly created partitions, so
611+
// 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute),
612+
// otherwise the test may hang for 1 minute.
610613
org.apache.flink.util.CloseableIterator<Row> rowIter =
611-
tEnv.executeSql(String.format("select * from %s", tableName)).collect();
614+
tEnv.executeSql(
615+
String.format(
616+
"select * from %s /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */",
617+
tableName))
618+
.collect();
612619
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
613620

614621
// then create some new partitions, and write rows to the new partitions
@@ -1047,8 +1054,13 @@ void testStreamingReadMultiPartitionPushDown() throws Exception {
10471054
+ "project=[a, b, d]]], fields=[a, b, d])");
10481055

10491056
// test partition key prefix match
1057+
// This test requires dynamically discovering newly created partitions, so
1058+
// 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute),
1059+
// otherwise the test may hang for 1 minute.
10501060
org.apache.flink.util.CloseableIterator<Row> rowIter =
1051-
tEnv.executeSql("select * from multi_partitioned_table where c ='2025'").collect();
1061+
tEnv.executeSql(
1062+
"select * from multi_partitioned_table /*+ OPTIONS('scan.partition.discovery.interval' = '2s') */ where c ='2025'")
1063+
.collect();
10521064

10531065
assertResultsIgnoreOrder(rowIter, expectedRowValues, false);
10541066

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlussSourceBuilderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void testMissingScanPartitionDiscoveryInterval() {
192192
.build();
193193

194194
// Then
195-
assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(10000L);
195+
assertThat(source.scanPartitionDiscoveryIntervalMs).isEqualTo(60000L);
196196
}
197197

198198
@Test

0 commit comments

Comments
 (0)