Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit 0b45bc9

Browse files
yzeng1618zengyi
andauthored
[Fix][connector-jdbc] Fix CatalogUtils getCatalogTable(Connection, String, ...) losing primary key for query-only sources (apache#10093)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent e4ec14a commit 0b45bc9

File tree

9 files changed

+365
-6
lines changed

9 files changed

+365
-6
lines changed

docs/en/connector-v2/source/Jdbc.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ The JDBC Source connector supports parallel reading of data from tables. SeaTunn
243243
> If the table can not be split(for example, table have no Primary Key or Unique Index, and `partition_column` is not set), it will run in single concurrency.
244244
>
245245
> Use `table_path` to replace `query` for single table reading. If you need to read multiple tables, use `table_list`.
246+
>
247+
> When inferring a primary key based on a `query`, the key is inherited from the underlying table where the first column in the result set is located, and its strictness for the overall join result set is not guaranteed (for example, when the query contains joins or reads from multiple tables).
246248
247249
## appendix
248250

docs/en/connector-v2/source/Mysql.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ How many splits do we need to split into, only support positive integer. default
176176
> If the table can not be split(for example, table have no Primary Key or Unique Index, and `partition_column` is not set), it will run in single concurrency.
177177
>
178178
> Use `table_path` to replace `query` for single table reading. If you need to read multiple tables, use `table_list`.
179+
>
180+
> When inferring a primary key based on a `query`, the key is inherited from the underlying table where the first column in the result set is located, and its strictness for the overall join result set is not guaranteed (for example, when the query contains joins or reads from multiple tables).
179181
180182
## Task Example
181183

docs/zh/connector-v2/source/Jdbc.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用某些
236236
> 如果表无法分割(例如,表没有主键或唯一索引,且未设置 `partition_column`),它将以单并发运行。
237237
>
238238
> 使用 `table_path` 替换 `query` 进行单表读取。如果需要读取多个表,请使用 `table_list`
239+
> 当基于 `query` 推断主键时,主键继承自结果集中第一列所在的底层表;如果 `query` 包含多表 JOIN 或同时从多张表读取,该主键对整个 JOIN 结果集的唯一性不作严格保证。
239240
240241
## 附录
241242

docs/zh/connector-v2/source/Mysql.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用特定
177177
> 如果表无法拆分(例如,表没有主键或唯一索引,且未设置 `partition_column`),则将以单线程并发方式运行。
178178
>
179179
> 使用 `table_path` 替代 `query` 来进行单表读取。如果需要读取多个表,请使用 `table_list`
180+
> 当基于 `query` 推断主键时,主键继承自结果集中第一列所在的底层表;如果 `query` 包含多表 JOIN 或同时从多张表读取,该主键对整个 JOIN 结果集的唯一性不作严格保证。
180181
181182
## 任务示例
182183

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.List;
5151
import java.util.Map;
5252
import java.util.Optional;
53+
import java.util.Set;
5354
import java.util.function.BiFunction;
5455
import java.util.stream.Collectors;
5556

@@ -333,7 +334,36 @@ public static CatalogTable getCatalogTable(
333334
Connection connection, String sqlQuery, JdbcDialectTypeMapper typeMapper)
334335
throws SQLException {
335336
try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
336-
return getCatalogTable(ps.getMetaData(), typeMapper, sqlQuery);
337+
ResultSetMetaData resultSetMetaData = ps.getMetaData();
338+
CatalogTable catalogTable = getCatalogTable(resultSetMetaData, typeMapper, sqlQuery);
339+
340+
PrimaryKey primaryKey = extractPrimaryKey(connection, resultSetMetaData, sqlQuery);
341+
if (primaryKey == null) {
342+
return catalogTable;
343+
}
344+
345+
Set<String> queryColumns =
346+
catalogTable.getTableSchema().getColumns().stream()
347+
.map(Column::getName)
348+
.collect(Collectors.toSet());
349+
if (!queryColumns.containsAll(primaryKey.getColumnNames())) {
350+
return catalogTable;
351+
}
352+
353+
TableSchema newSchema =
354+
TableSchema.builder()
355+
.columns(catalogTable.getTableSchema().getColumns())
356+
.primaryKey(primaryKey)
357+
.constraintKey(catalogTable.getTableSchema().getConstraintKeys())
358+
.build();
359+
360+
return CatalogTable.of(
361+
catalogTable.getTableId(),
362+
newSchema,
363+
catalogTable.getOptions(),
364+
catalogTable.getPartitionKeys(),
365+
catalogTable.getComment(),
366+
catalogTable.getCatalogName());
337367
}
338368
}
339369

@@ -353,4 +383,32 @@ public static CatalogTable getCatalogTable(Connection connection, String sqlQuer
353383
return getCatalogTable(resultSetMetaData, sqlQuery);
354384
}
355385
}
386+
387+
private static PrimaryKey extractPrimaryKey(
388+
Connection connection, ResultSetMetaData resultSetMetaData, String sqlQuery) {
389+
try {
390+
String tableName = resultSetMetaData.getTableName(1);
391+
if (StringUtils.isBlank(tableName)) {
392+
return null;
393+
}
394+
395+
String databaseName = resultSetMetaData.getCatalogName(1);
396+
String schemaName = resultSetMetaData.getSchemaName(1);
397+
DatabaseMetaData dbMetaData = connection.getMetaData();
398+
399+
TablePath tablePath =
400+
TablePath.of(
401+
StringUtils.isBlank(databaseName) ? null : databaseName,
402+
StringUtils.isBlank(schemaName) ? null : schemaName,
403+
tableName);
404+
405+
return getPrimaryKey(dbMetaData, tablePath).orElse(null);
406+
} catch (SQLException e) {
407+
log.debug(
408+
"Failed to extract primary key from database metadata for sql: {}",
409+
sqlQuery,
410+
e);
411+
return null;
412+
}
413+
}
356414
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,12 @@ protected Object queryMin(JdbcSourceTable table, String columnName, Object exclu
194194

195195
columnName = jdbcDialect.quoteIdentifier(columnName);
196196
columnName = jdbcDialect.convertType(columnName, column.getSourceType());
197-
if (StringUtils.isNotBlank(table.getQuery())) {
197+
String query = normalizeQuery(table.getQuery());
198+
if (StringUtils.isNotBlank(query)) {
198199
minQuery =
199200
String.format(
200201
"SELECT MIN(%s) FROM (%s) tmp WHERE %s > ?",
201-
columnName, table.getQuery(), columnName);
202+
columnName, query, columnName);
202203
} else {
203204
minQuery =
204205
String.format(
@@ -232,11 +233,11 @@ protected Pair<Object, Object> queryMinMax(JdbcSourceTable table, String columnN
232233

233234
columnName = jdbcDialect.quoteIdentifier(columnName);
234235
columnName = jdbcDialect.convertType(columnName, column.getSourceType());
235-
if (StringUtils.isNotBlank(table.getQuery())) {
236+
String query = normalizeQuery(table.getQuery());
237+
if (StringUtils.isNotBlank(query)) {
236238
sqlQuery =
237239
String.format(
238-
"SELECT MIN(%s), MAX(%s) FROM (%s) tmp",
239-
columnName, columnName, table.getQuery());
240+
"SELECT MIN(%s), MAX(%s) FROM (%s) tmp", columnName, columnName, query);
240241
} else {
241242
sqlQuery =
242243
String.format(
@@ -260,6 +261,11 @@ protected Pair<Object, Object> queryMinMax(JdbcSourceTable table, String columnN
260261
}
261262

262263
protected Optional<SeaTunnelRowType> findSplitKey(JdbcSourceTable table) {
264+
if (StringUtils.isNotBlank(table.getQuery()) && table.getPartitionColumn() == null) {
265+
// Keep query-based tables on single split unless user explicitly sets partition column
266+
return Optional.empty();
267+
}
268+
263269
TableSchema schema = table.getCatalogTable().getTableSchema();
264270
List<Column> columns = schema.getColumns();
265271
Map<String, Column> columnMap =
@@ -350,6 +356,14 @@ protected boolean isSupportSplitColumn(Column splitColumn) {
350356
}
351357
}
352358

359+
private String normalizeQuery(String query) {
360+
if (StringUtils.isEmpty(query)) {
361+
return query;
362+
}
363+
// Avoid trailing semicolons/whitespace breaking wrapped subqueries
364+
return StringUtils.stripEnd(query, " \t\r\n;");
365+
}
366+
353367
protected String createSplitId(TablePath tablePath, int index) {
354368
return String.format("%s-%s", tablePath, index);
355369
}

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
1919

20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2021
import org.apache.seatunnel.api.table.catalog.Column;
2122
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
2223
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
@@ -30,10 +31,16 @@
3031
import org.junit.jupiter.api.Assertions;
3132
import org.junit.jupiter.api.Test;
3233

34+
import java.sql.Connection;
35+
import java.sql.PreparedStatement;
36+
import java.sql.ResultSetMetaData;
3337
import java.sql.SQLException;
3438
import java.util.List;
3539
import java.util.Optional;
3640

41+
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.when;
43+
3744
public class CatalogUtilsTest {
3845

3946
@Test
@@ -83,4 +90,89 @@ public Column mappingColumn(BasicTypeDefine typeDefine) {
8390
});
8491
Assertions.assertEquals("id comment", tableSchema2.getColumns().get(0).getComment());
8592
}
93+
94+
@Test
95+
void testGetCatalogTableWithPrimaryKeyFromQuery() throws SQLException {
96+
Connection connection = mock(Connection.class);
97+
PreparedStatement preparedStatement = mock(PreparedStatement.class);
98+
ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
99+
100+
when(connection.prepareStatement("select id, name from test_table"))
101+
.thenReturn(preparedStatement);
102+
when(preparedStatement.getMetaData()).thenReturn(resultSetMetaData);
103+
104+
when(resultSetMetaData.getColumnCount()).thenReturn(2);
105+
when(resultSetMetaData.getColumnLabel(1)).thenReturn("id");
106+
when(resultSetMetaData.getColumnLabel(2)).thenReturn("name");
107+
when(resultSetMetaData.getTableName(1)).thenReturn("test_table");
108+
when(resultSetMetaData.getCatalogName(1)).thenReturn("test_db");
109+
when(resultSetMetaData.getSchemaName(1)).thenReturn(null);
110+
when(resultSetMetaData.isNullable(1)).thenReturn(ResultSetMetaData.columnNullable);
111+
when(resultSetMetaData.isNullable(2)).thenReturn(ResultSetMetaData.columnNullable);
112+
113+
when(connection.getMetaData()).thenReturn(new TestDatabaseMetaData());
114+
115+
JdbcDialectTypeMapper typeMapper =
116+
new JdbcDialectTypeMapper() {
117+
@Override
118+
public Column mappingColumn(BasicTypeDefine typeDefine) {
119+
return PhysicalColumn.of(
120+
typeDefine.getName(),
121+
BasicType.VOID_TYPE,
122+
typeDefine.getLength(),
123+
typeDefine.isNullable(),
124+
null,
125+
null);
126+
}
127+
};
128+
129+
CatalogTable catalogTable =
130+
CatalogUtils.getCatalogTable(
131+
connection, "select id, name from test_table", typeMapper);
132+
133+
PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
134+
Assertions.assertNotNull(primaryKey);
135+
Assertions.assertEquals("testfdawe_", primaryKey.getPrimaryKey());
136+
Assertions.assertEquals(1, primaryKey.getColumnNames().size());
137+
Assertions.assertEquals("id", primaryKey.getColumnNames().get(0));
138+
}
139+
140+
@Test
141+
void testGetCatalogTableNotApplyPrimaryKeyWhenMissingColumns() throws SQLException {
142+
Connection connection = mock(Connection.class);
143+
PreparedStatement preparedStatement = mock(PreparedStatement.class);
144+
ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
145+
146+
when(connection.prepareStatement("select name from test_table"))
147+
.thenReturn(preparedStatement);
148+
when(preparedStatement.getMetaData()).thenReturn(resultSetMetaData);
149+
150+
when(resultSetMetaData.getColumnCount()).thenReturn(1);
151+
when(resultSetMetaData.getColumnLabel(1)).thenReturn("name");
152+
when(resultSetMetaData.getTableName(1)).thenReturn("test_table");
153+
when(resultSetMetaData.getCatalogName(1)).thenReturn("test_db");
154+
when(resultSetMetaData.getSchemaName(1)).thenReturn(null);
155+
when(resultSetMetaData.isNullable(1)).thenReturn(ResultSetMetaData.columnNullable);
156+
157+
when(connection.getMetaData()).thenReturn(new TestDatabaseMetaData());
158+
159+
JdbcDialectTypeMapper typeMapper =
160+
new JdbcDialectTypeMapper() {
161+
@Override
162+
public Column mappingColumn(BasicTypeDefine typeDefine) {
163+
return PhysicalColumn.of(
164+
typeDefine.getName(),
165+
BasicType.VOID_TYPE,
166+
typeDefine.getLength(),
167+
typeDefine.isNullable(),
168+
null,
169+
null);
170+
}
171+
};
172+
173+
CatalogTable catalogTable =
174+
CatalogUtils.getCatalogTable(connection, "select name from test_table", typeMapper);
175+
176+
Assertions.assertNull(catalogTable.getTableSchema().getPrimaryKey());
177+
}
86178
}

0 commit comments

Comments
 (0)