diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java index bae568956..98c4107c3 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/AbstractDialect.java @@ -265,4 +265,14 @@ private Range(int min, int max) { this.max = max; } } + + /** + * The default way of append by origin sql end. + * @param query origin query sql + * @param limit number of row to emit. The value of the parameter should be non-negative. + * @return the entire sql after adding limit clause. + */ + public String addLimitClause(String query, long limit) { + return String.format("%s %s", query, getLimitClause(limit)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java index d3060076e..a92fdf1aa 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/dialect/JdbcDialect.java @@ -58,6 +58,14 @@ public interface JdbcDialect extends Serializable { */ String getLimitClause(long limit); + /** + * Get the way of add limit clause. + * @param query origin query sql + * @param limit number of row to emit. The value of the parameter should be non-negative. + * @return the entire sql after adding limit clause. + */ + String addLimitClause(String query, long limit); + /** * Check if this dialect instance support a specific data type in table schema. * @@ -154,4 +162,5 @@ String getSelectFromStatement( default String appendDefaultUrlProperties(String url) { return url; } + } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java index d26898a53..3364a7f15 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java @@ -179,7 +179,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon } if (limit >= 0) { - query = String.format("%s %s", query, dialect.getLimitClause(limit)); + query = dialect.addLimitClause(query, limit); } LOG.debug("Query generated for JDBC scan: " + query); diff --git a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java index 41fd3560b..9934b9749 100644 --- a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java +++ b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerDialect.java @@ -132,7 +132,12 @@ public JdbcDialectConverter getRowConverter(RowType rowType) { @Override public String getLimitClause(long limit) { - throw new IllegalArgumentException("SqlServerDialect does not support limit clause"); + return String.format("SELECT TOP %s", limit); + } + + @Override + public String addLimitClause(String query, long limit) { + return query.replace("SELECT", getLimitClause(limit)); } @Override diff --git a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java index 814d65c92..4b9fb8675 100644 --- a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java +++ b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/database/dialect/SqlServerPreparedStatementTest.java @@ -91,4 +91,11 @@ void testSelectStatement() { "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " + "WHERE id = :id AND __field_3__ = :__field_3__"); } + + @Test + void testLimitStatement() { + String selectStmt = dialect.addLimitClause("SELECT * FROM TBL", 10); + assertThat(selectStmt) + .isEqualTo("SELECT TOP 10 * FROM TBL"); + } }