From a322f2f163c6d69cebe5bcd178ea9df3402a9690 Mon Sep 17 00:00:00 2001 From: sleepy <649814031@qq.com> Date: Tue, 8 Apr 2025 15:35:33 +0800 Subject: [PATCH 1/3] [hotfix] sqlserver limit statement support --- .../jdbc/core/table/source/JdbcDynamicTableSource.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java index d26898a53..9ef9e9d23 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java @@ -179,7 +179,12 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon } if (limit >= 0) { - query = String.format("%s %s", query, dialect.getLimitClause(limit)); + if ("SqlServer".equals(this.dialectName)) { + query = query.replace("SELECT",String.format("SELECT TOP %s ",limit)); + } + else{ + query = String.format("%s %s", query, dialect.getLimitClause(limit)); + } } LOG.debug("Query generated for JDBC scan: " + query); From 3954deba1084a63a4136a17762d48bc6befe8cfb Mon Sep 17 00:00:00 2001 From: sleepy <649814031@qq.com> Date: Fri, 18 Apr 2025 17:56:15 +0800 Subject: [PATCH 2/3] [hotfix] sqlserver limit statement support --- .../jdbc/core/database/dialect/JdbcDialect.java | 11 +++++++++++ .../core/table/source/JdbcDynamicTableSource.java | 7 +------ .../sqlserver/database/dialect/SqlServerDialect.java | 7 ++++++- .../dialect/SqlServerPreparedStatementTest.java | 7 +++++++ 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java index d3060076e..1583cbd6d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java @@ -58,6 +58,16 @@ public interface JdbcDialect extends Serializable { */ String getLimitClause(long limit); + /** + * Get the way of add limit clause. the default way of append by origin sql end. + * @param query origin query sql + * @param limit number of row to emit. The value of the parameter should be non-negative. + * @return the entire sql after adding limit clause. + */ + default String addLimitClause(String query, long limit) { + return String.format("%s %s", query, getLimitClause(limit)); + } + /** * Check if this dialect instance support a specific data type in table schema. * @@ -154,4 +164,5 @@ String getSelectFromStatement( default String appendDefaultUrlProperties(String url) { return url; } + } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java index 9ef9e9d23..3364a7f15 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java @@ -179,12 +179,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon } if (limit >= 0) { - if ("SqlServer".equals(this.dialectName)) { - query = query.replace("SELECT",String.format("SELECT TOP %s ",limit)); - } - else{ - query = String.format("%s %s", query, dialect.getLimitClause(limit)); - } + query = dialect.addLimitClause(query, limit); } LOG.debug("Query generated for JDBC scan: " + query); diff --git a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java index 41fd3560b..9934b9749 100644 --- a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java +++ b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java @@ -132,7 +132,12 @@ public JdbcDialectConverter getRowConverter(RowType rowType) { @Override public String getLimitClause(long limit) { - throw new IllegalArgumentException("SqlServerDialect does not support limit clause"); + return String.format("SELECT TOP %s", limit); + } + + @Override + public String addLimitClause(String query, long limit) { + return query.replace("SELECT", getLimitClause(limit)); } @Override diff --git a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java index 814d65c92..4b9fb8675 100644 --- a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java +++ b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java @@ -91,4 +91,11 @@ void testSelectStatement() { "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " + "WHERE id = :id AND __field_3__ = :__field_3__"); } + + @Test + void testLimitStatement() { + String selectStmt = dialect.addLimitClause("SELECT * FROM TBL", 10); + assertThat(selectStmt) + .isEqualTo("SELECT TOP 10 * FROM TBL"); + } } From 77893606bace9f095f5346afbdee9f4011f21f68 Mon Sep 17 00:00:00 2001 From: sleepy <649814031@qq.com> Date: Mon, 28 Apr 2025 13:39:46 +0800 Subject: [PATCH 3/3] [hotfix] sqlserver limit statement support --- .../jdbc/core/database/dialect/AbstractDialect.java | 10 ++++++++++ .../jdbc/core/database/dialect/JdbcDialect.java | 6 ++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java index bae568956..98c4107c3 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java @@ -265,4 +265,14 @@ private Range(int min, int max) { this.max = max; } } + + /** + * The default way of append by origin sql end. + * @param query origin query sql + * @param limit number of row to emit. The value of the parameter should be non-negative. + * @return the entire sql after adding limit clause. + */ + public String addLimitClause(String query, long limit) { + return String.format("%s %s", query, getLimitClause(limit)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java index 1583cbd6d..a92fdf1aa 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java @@ -59,14 +59,12 @@ public interface JdbcDialect extends Serializable { String getLimitClause(long limit); /** - * Get the way of add limit clause. the default way of append by origin sql end. + * Get the way of add limit clause. * @param query origin query sql * @param limit number of row to emit. The value of the parameter should be non-negative. * @return the entire sql after adding limit clause. */ - default String addLimitClause(String query, long limit) { - return String.format("%s %s", query, getLimitClause(limit)); - } + String addLimitClause(String query, long limit); /** * Check if this dialect instance support a specific data type in table schema.