diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 07e19fc13b5b..322707e8c8f0 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -449,7 +449,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 120 + timeout-minutes: 180 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} @@ -510,7 +510,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 120 + timeout-minutes: 180 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} @@ -540,7 +540,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 120 + timeout-minutes: 180 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java index 06078a62d10a..394e1e81ca1e 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java @@ -21,15 +21,43 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseTypeConverter; import org.apache.seatunnel.connectors.seatunnel.common.util.CatalogUtil; +import java.util.HashSet; +import java.util.Set; + import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; public class ClickhouseCatalogUtil extends CatalogUtil { + private static final ThreadLocal> PRIMARY_KEY_COLUMNS = + ThreadLocal.withInitial(HashSet::new); + public static final ClickhouseCatalogUtil INSTANCE = new ClickhouseCatalogUtil(); + @Override + public String getCreateTableSql( + String template, + String database, + String table, + TableSchema tableSchema, + String comment, + String optionsKey) { + Set pkColumns = PRIMARY_KEY_COLUMNS.get(); + pkColumns.clear(); + if (tableSchema.getPrimaryKey() != null) { + pkColumns.addAll(tableSchema.getPrimaryKey().getColumnNames()); + } + try { + return super.getCreateTableSql( + template, database, table, tableSchema, comment, optionsKey); + } finally { + pkColumns.clear(); + } + } + public String columnToConnectorType(Column column) { checkNotNull(column, "The column is required."); String columnType; @@ -38,6 +66,14 @@ public String columnToConnectorType(Column column) { } else { columnType = ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType(); } + + Set pkColumns = PRIMARY_KEY_COLUMNS.get(); + boolean isPrimaryKeyColumn = pkColumns != null && pkColumns.contains(column.getName()); + + if (column.isNullable() && !isUnsupportedNullableType(columnType) && !isPrimaryKeyColumn) { + columnType = "Nullable(" + columnType + ")"; + } + return String.format( "`%s` %s %s", column.getName(), @@ -49,6 +85,10 @@ public String columnToConnectorType(Column column) { + "'"); } + private static boolean isUnsupportedNullableType(String columnType) { + return columnType.startsWith("Map(") || columnType.startsWith("Array("); + } + public String getDropTableSql(TablePath tablePath, boolean ignoreIfNotExists) { if (ignoreIfNotExists) { return "DROP TABLE IF EXISTS " diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java index 90122ebd64a2..b2a44e25a16a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java @@ -99,14 +99,16 @@ public void test() { .build(), "clickhouse test table", ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + // Primary key columns (id, age) should NOT be wrapped in Nullable + // because ClickHouse does not allow nullable columns in ORDER BY / PRIMARY KEY Assertions.assertEquals( createTableSql, "CREATE TABLE IF NOT EXISTS `test1`.`test2` (\n" + " `id` Int64 ,`age` Int32 COMMENT 'test comment',\n" - + " `name` String ,\n" - + "`score` Int32 COMMENT '''N''-N',\n" - + "`gender` Int8 ,\n" - + "`create_time` Int64 \n" + + " `name` Nullable(String) ,\n" + + "`score` Nullable(Int32) COMMENT '\''N''-N',\n" + + "`gender` Nullable(Int8) ,\n" + + "`create_time` Nullable(Int64) \n" + ") ENGINE = MergeTree()\n" + "ORDER BY (`id`,`age`)\n" + "PRIMARY KEY (`id`,`age`)\n" diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java index dd019b3d481f..c36b1d652dfb 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,6 +34,8 @@ void returnsReconvertedTypeWhenSinkTypeNotNull() { Column column = mock(Column.class); when(column.getName()).thenReturn("col1"); when(column.getSinkType()).thenReturn("String"); + when(column.isNullable()).thenReturn(false); + when(column.getComment()).thenReturn(""); String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column); @@ -44,6 +47,8 @@ void returnsReconvertedTypeWhenSinkTypeIsNull() { Column column = mock(Column.class); when(column.getName()).thenReturn("col1"); when(column.getDataType()).thenReturn((SeaTunnelDataType) BasicType.INT_TYPE); + when(column.isNullable()).thenReturn(false); + when(column.getComment()).thenReturn(""); String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column); @@ -56,9 +61,44 @@ void returnsReconvertedTypeWhenTypesNotNull() { when(column.getName()).thenReturn("col1"); when(column.getDataType()).thenReturn((SeaTunnelDataType) BasicType.INT_TYPE); when(column.getSinkType()).thenReturn("String"); + when(column.isNullable()).thenReturn(false); + when(column.getComment()).thenReturn(""); String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column); assertEquals("`col1` String ", result); } + + @Test + void wrapsTypeWithNullableWhenColumnIsNullable() { + Column column = mock(Column.class); + when(column.getName()).thenReturn("col1"); + when(column.getSinkType()).thenReturn("String"); + when(column.isNullable()).thenReturn(true); + when(column.getComment()).thenReturn(""); + + String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column); + + assertEquals("`col1` Nullable(String) ", result); + } + + @Test + void escapesSingleQuoteAndBackslashInComment() { + Column column = mock(Column.class); + when(column.getName()).thenReturn("col1"); + when(column.getSinkType()).thenReturn("String"); + when(column.isNullable()).thenReturn(false); + when(column.getComment()).thenReturn("O'Reilly \\ path"); + + String result = ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column); + + assertEquals("`col1` String COMMENT 'O''Reilly \\\\ path'", result); + } + + @Test + void throwsExceptionWhenColumnIsNull() { + assertThrows( + NullPointerException.class, + () -> ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(null)); + } } diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java index dc351e239fc2..2970700430cd 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Function; +import java.util.regex.Matcher; import java.util.stream.Collectors; @Slf4j @@ -70,7 +71,7 @@ public String getCreateTableSql( template = template.replaceAll( SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(), - primaryKey); + Matcher.quoteReplacement(primaryKey)); SqlTemplate.canHandledByTemplateWithPlaceholder( template, SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(), @@ -80,7 +81,8 @@ public String getCreateTableSql( template = template.replaceAll( - SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey); + SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), + Matcher.quoteReplacement(uniqueKey)); Map columnInTemplate = CreateTableParser.getColumnList(template); template = mergeColumnInTemplate(columnInTemplate, tableSchema, template); @@ -95,20 +97,27 @@ public String getCreateTableSql( // TODO: Remove this compatibility config template = template.replaceAll( - SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table); + SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), + Matcher.quoteReplacement(table)); log.warn( "The variable placeholder `${table_name}` has been marked as deprecated and will be removed soon, please use `${table}`"); } - return template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), database) - .replaceAll(SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), table) + return template.replaceAll( + SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), + Matcher.quoteReplacement(database)) .replaceAll( - SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields) + SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), + Matcher.quoteReplacement(table)) + .replaceAll( + SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), + Matcher.quoteReplacement(rowTypeFields)) .replaceAll( SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(), - Objects.isNull(comment) - ? "" - : comment.replace("'", "''").replace("\\", "\\\\")); + Matcher.quoteReplacement( + Objects.isNull(comment) + ? "" + : comment.replace("'", "''").replace("\\", "\\\\"))); } private String mergeColumnInTemplate( diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java index d779cd663f62..20949a1f1d6f 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/catalog/HbaseCatalog.java @@ -104,7 +104,13 @@ public List listTables(String databaseName) @Override public boolean tableExists(TablePath tablePath) throws CatalogException { checkNotNull(tablePath); - return hbaseClient.tableExists(tablePath.getTableName()); + String databaseName = tablePath.getDatabaseName(); + String tableName = tablePath.getTableName(); + String fullTableName = + (databaseName == null || databaseName.isEmpty()) + ? tableName + : databaseName + ":" + tableName; + return hbaseClient.tableExists(fullTableName); } @Override diff --git a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseCatalogTest.java b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseCatalogTest.java new file mode 100644 index 000000000000..d02e2b91a0f5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseCatalogTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hbase; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.hbase.catalog.HbaseCatalog; +import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; + +public class HbaseCatalogTest { + + @Test + public void testTableExistsWithNamespace() throws Exception { + HbaseParameters parameters = + HbaseParameters.builder() + .zookeeperQuorum("localhost") + .namespace("ns1") + .table("tbl") + .build(); + HbaseCatalog catalog = new HbaseCatalog("hbase", "ns1", parameters); + + HbaseClient hbaseClient = Mockito.mock(HbaseClient.class); + Mockito.when(hbaseClient.tableExists("ns1:tbl")).thenReturn(true); + + injectHbaseClient(catalog, hbaseClient); + + TablePath tablePath = TablePath.of("ns1", "tbl"); + Assertions.assertTrue(catalog.tableExists(tablePath)); + Mockito.verify(hbaseClient, Mockito.times(1)).tableExists("ns1:tbl"); + } + + @Test + public void testTableExistsWithoutNamespace() throws Exception { + HbaseParameters parameters = + HbaseParameters.builder() + .zookeeperQuorum("localhost") + .namespace("default") + .table("tbl") + .build(); + HbaseCatalog catalog = new HbaseCatalog("hbase", "default", parameters); + + HbaseClient hbaseClient = Mockito.mock(HbaseClient.class); + Mockito.when(hbaseClient.tableExists("tbl")).thenReturn(true); + + injectHbaseClient(catalog, hbaseClient); + + TablePath tablePath = TablePath.of("tbl"); + Assertions.assertTrue(catalog.tableExists(tablePath)); + Mockito.verify(hbaseClient, Mockito.times(1)).tableExists("tbl"); + } + + private void injectHbaseClient(HbaseCatalog catalog, HbaseClient hbaseClient) throws Exception { + Field clientField = HbaseCatalog.class.getDeclaredField("hbaseClient"); + clientField.setAccessible(true); + clientField.set(catalog, hbaseClient); + } +} diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java index a5b501b2ba76..3475f7f8561e 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergSourceOptions.java @@ -33,50 +33,50 @@ public class IcebergSourceOptions extends IcebergCommonOptions { Options.key("start_snapshot_timestamp") .longType() .noDefaultValue() - .withDescription(" the iceberg timestamp of starting snapshot "); + .withDescription("The iceberg timestamp of starting snapshot."); public static final Option KEY_START_SNAPSHOT_ID = Options.key("start_snapshot_id") .longType() .noDefaultValue() - .withDescription(" the iceberg id of starting snapshot "); + .withDescription("The iceberg id of starting snapshot."); public static final Option KEY_END_SNAPSHOT_ID = Options.key("end_snapshot_id") .longType() .noDefaultValue() - .withDescription(" the iceberg id of ending snapshot "); + .withDescription("The iceberg id of ending snapshot."); public static final Option KEY_USE_SNAPSHOT_ID = Options.key("use_snapshot_id") .longType() .noDefaultValue() - .withDescription(" the iceberg used snapshot id"); + .withDescription("The iceberg used snapshot id."); public static final Option KEY_USE_SNAPSHOT_TIMESTAMP = Options.key("use_snapshot_timestamp") .longType() .noDefaultValue() - .withDescription(" the iceberg used snapshot timestamp"); + .withDescription("The iceberg used snapshot timestamp."); public static final Option KEY_STREAM_SCAN_STRATEGY = Options.key("stream_scan_strategy") .enumType(IcebergStreamScanStrategy.class) .defaultValue(FROM_LATEST_SNAPSHOT) - .withDescription(" the iceberg strategy of stream scanning"); + .withDescription("The iceberg strategy of stream scanning."); public static final Option> KEY_TABLE_LIST = Options.key("table_list") .listType(SourceTableConfig.class) .noDefaultValue() - .withDescription(" the iceberg tables"); + .withDescription("The iceberg tables."); public static final Option KEY_INCREMENT_SCAN_INTERVAL = Options.key("increment.scan-interval") .longType() .defaultValue(2000L) - .withDescription(" the interval of increment scan(mills)"); + .withDescription("The interval of increment scan (mills)."); public static final Option QUERY = - Options.key("query").stringType().noDefaultValue().withDescription("the select sql"); + Options.key("query").stringType().noDefaultValue().withDescription("The select sql."); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 1e8b40ace004..47bd9672f94e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -131,6 +131,46 @@ public void testClickhouseWithCreateSchemaWhenComment(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode()); } + @TestTemplate + public void testClickhouseAutoCreateTableWithSpecialCharactersInComments( + TestContainer testContainer) throws Exception { + String testTableName = "test_special_chars_comments_table"; + + String createSourceTableSql = + String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (" + + "id UInt64, " + + "col_with_dollar_comment String COMMENT 'Comment with $1 and $2 special chars', " + + "col_with_backslash_comment String COMMENT 'Comment with \\\\ backslash', " + + "col_with_mixed_chars String COMMENT '~`!@#$%%^&*()_+-*/-=[]{}', " + + "col_with_chinese_chars String COMMENT '这是特殊符号测试英文键盘:~`!@#$%%^&*()_+-*/-=[]{}'" + + ") ENGINE = MergeTree() ORDER BY id", + DATABASE, testTableName); + + String sinkTableName = testTableName + "_sink"; + + try (Statement statement = connection.createStatement()) { + statement.execute(createSourceTableSql); + + String insertSql = + String.format( + "INSERT INTO %s.%s VALUES " + + "(1, 'value1', 'value2', 'value3', 'value4')", + DATABASE, testTableName); + statement.execute(insertSql); + } + + Container.ExecResult execResult = + testContainer.executeJob("/clickhouse_auto_create_with_special_comments.conf"); + + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + Assertions.assertEquals(1, countData(sinkTableName)); + + dropTable(DATABASE + "." + testTableName); + dropTable(DATABASE + "." + sinkTableName); + } + @TestTemplate public void clickhouseWithCreateSchemaWhenNotExist(TestContainer container) throws Exception { String tableName = "default.sink_table_for_schema"; @@ -206,7 +246,7 @@ public void clickhouseRecreateSchemaAndCustom(TestContainer container) throws Ex String tableName = "default.sink_table_for_schema"; Container.ExecResult execResult = container.executeJob("/clickhouse_with_recreate_schema_and_custom.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStdout()); Assertions.assertEquals(101, countData(tableName)); dropTable(tableName); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_auto_create_with_special_comments.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_auto_create_with_special_comments.conf new file mode 100644 index 000000000000..3b5c7a3039c0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_auto_create_with_special_comments.conf @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file tests auto create table with special characters in column comments +###### Testing regex special characters like $ and \ are properly handled by Matcher.quoteReplacement +###### + +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 10 +} + +source { + Clickhouse { + host = "clickhouse:8123" + table_path = "default.test_special_chars_comments_table" + sql = "select * from default.test_special_chars_comments_table" + username = "default" + password = "" + plugin_output = "source_table" + } +} + +sink { + Clickhouse { + host = "clickhouse:8123" + database = "default" + table = "test_special_chars_comments_table_sink" + username = "default" + password = "" + "schema_save_mode" = "CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode" = "APPEND_DATA" + "save_mode_create_template" = """ + CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( + ${rowtype_fields} + ) ENGINE = MergeTree() + ORDER BY (id) + COMMENT '${comment}'; + """ + support_upsert = true + allow_experimental_lightweight_delete = true + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java index bf51595b615f..86f1a5947d9a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java @@ -300,7 +300,7 @@ public void makeJobFailing(Throwable e) { updateJobState(JobStatus.FAILING); } - public void startJob() { + public synchronized void startJob() { isRunning = true; log.info("{} state process is start", getJobFullName()); updateJobState(JobStatus.SCHEDULED);