diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java index 617e6c526..ed73fedb6 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java @@ -106,11 +106,12 @@ public Serializable[][] getParameterValues() { Serializable[][] parameters = new Serializable[batchNum][2]; long start = minVal; - for (int i = 0; i < batchNum; i++) { + for (int i = 0; i < batchNum - 1; i++) { long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0); parameters[i] = new Long[] {start, end}; start = end + 1; } + parameters[batchNum - 1] = new Long[] {start, maxVal}; return parameters; } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java index 25e350e1a..fc5fcbb12 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java @@ -115,6 +115,21 @@ void testBatchNumTooLarge() { check(expected, actual); } + @Test + void testBatchMaxMinTooLarge() { + JdbcNumericBetweenParametersProvider provider = + new JdbcNumericBetweenParametersProvider(2260418954055131340L, 3875220057236942850L) + .ofBatchSize(3); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = { + new long[] {2260418954055131340L, 2798685988449068510L}, + new long[] {2798685988449068511L, 3336953022843005681L}, + new long[] {3336953022843005682L, 3875220057236942850L} + }; + check(expected, actual); + } + private void check(long[][] expected, Serializable[][] actual) { assertThat(actual).hasDimensions(expected.length, expected[0].length); for (int i = 0; i < expected.length; i++) {