diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java index 394e1e81ca1e..d1fb47264297 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java @@ -54,7 +54,7 @@ public String getCreateTableSql( return super.getCreateTableSql( template, database, table, tableSchema, comment, optionsKey); } finally { - pkColumns.clear(); + PRIMARY_KEY_COLUMNS.remove(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java index c36b1d652dfb..12d335566e1b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java @@ -18,11 +18,20 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSinkOptions; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; @@ -101,4 +110,74 @@ void throwsExceptionWhenColumnIsNull() { NullPointerException.class, () -> ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(null)); } + + @Test + void testPrimaryKeyColumnShouldNotBeNullable() { + // Test that ThreadLocal is properly cleared after getCreateTableSql call + Column column = mock(Column.class); + when(column.getName()).thenReturn("pk_column"); + when(column.getSinkType()).thenReturn("String"); + when(column.isNullable()).thenReturn(true); + when(column.getComment()).thenReturn(""); + + List columns = new ArrayList<>(); + columns.add(column); + + TableSchema tableSchema = + TableSchema.builder() + .primaryKey(PrimaryKey.of("", Collections.singletonList("pk_column"))) + .columns(columns) + .build(); + + ClickhouseCatalogUtil.INSTANCE.getCreateTableSql( + "CREATE TABLE `${database}`.`${table}` (${rowtype_fields})", + "test_db", + "test_table", + tableSchema, + null, + ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + + // After getCreateTableSql call, ThreadLocal should be cleared + // so columnToConnectorType should treat it as NOT a primary key + String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column); + assertEquals("`pk_column` Nullable(String) ", result); + } + + @Test + void testPrimaryKeyColumnWithNullableShouldNotWrapInNullable() { + // Test the actual scenario: primary key columns should NOT be wrapped in Nullable + // because ClickHouse doesn't allow nullable columns in ORDER BY / PRIMARY KEY + String template = + "CREATE TABLE `${database}`.`${table}` (\n" + + " ${rowtype_primary_key},\n" + + " ${rowtype_fields}\n" + + ") ENGINE = MergeTree()\n" + + "ORDER BY (${rowtype_primary_key})"; + + List columns = new ArrayList<>(); + columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, true, null, "")); + columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) null, true, null, "")); + columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null, true, null, "")); + + TableSchema tableSchema = + TableSchema.builder() + .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age"))) + .columns(columns) + .build(); + + String sql = + ClickhouseCatalogUtil.INSTANCE.getCreateTableSql( + template, + "test_db", + "test_table", + tableSchema, + null, + ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + + // Primary key columns (id, age) should NOT be wrapped in Nullable + assertEquals(true, sql.contains("`id` Int64 ")); + assertEquals(true, sql.contains("`age` Int32 ")); + // Non-primary key column (name) should be wrapped in Nullable + assertEquals(true, sql.contains("`name` Nullable(String) ")); + } }