Skip to content

Commit 677f9b8

Browse files
authored
Load: Added "skipFailedTableSchemaCheck" parameter (apache#16522)
* fix fix * tested
1 parent 5e00ed5 commit 677f9b8

File tree

5 files changed

+46
-12
lines changed

5 files changed

+46
-12
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.exception.IllegalPathException;
2323
import org.apache.iotdb.commons.path.PatternTreeMap;
24+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2425
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
2526
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
2627
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -146,7 +147,18 @@ public void autoCreateAndVerify(final IDeviceID device) throws LoadAnalyzeExcept
146147
e);
147148
}
148149

149-
createTableAndDatabaseIfNecessary(device.getTableName());
150+
try {
151+
createTableAndDatabaseIfNecessary(device.getTableName());
152+
} catch (final Exception e) {
153+
if (PipeConfig.getInstance().isSkipFailedTableSchemaCheck()) {
154+
LOGGER.info(
155+
"Failed to check table schema, will skip because skipFailedTableSchemaCheck is set to true, message: {}",
156+
e.getMessage());
157+
} else {
158+
throw e;
159+
}
160+
}
161+
150162
// TODO: add permission check and record auth cost
151163
addDevice(device);
152164
if (shouldFlushDevices()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,37 +132,37 @@ public Optional<TableSchema> validateTableHeaderSchema(
132132
DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table);
133133
// If table with this name already exists and isStrictTagColumn is true, make sure the
134134
// existing
135-
// id columns are the prefix of the incoming id columns, or vice versa
135+
// id columns are the prefix of the incoming tag columns, or vice versa
136136
if (isStrictTagColumn) {
137137
final List<TsTableColumnSchema> realTagColumns = table.getTagColumnSchemaList();
138138
final List<ColumnSchema> incomingTagColumns = tableSchema.getTagColumns();
139139
if (realTagColumns.size() <= incomingTagColumns.size()) {
140-
// When incoming table has more ID columns, the existing id columns
141-
// should be the prefix of the incoming id columns (or equal)
140+
// When incoming table has more TAG columns, the existing tag columns
141+
// should be the prefix of the incoming tag columns (or equal)
142142
for (int indexReal = 0; indexReal < realTagColumns.size(); indexReal++) {
143143
final String tagName = realTagColumns.get(indexReal).getColumnName();
144144
final int indexIncoming = tableSchema.getIndexAmongTagColumns(tagName);
145145
if (indexIncoming != indexReal) {
146146
throw new LoadAnalyzeTableColumnDisorderException(
147147
String.format(
148-
"Can not create table because incoming table has no less id columns than existing table, "
149-
+ "and the existing id columns are not the prefix of the incoming id columns. "
150-
+ "Existing id column: %s, index in existing table: %s, index in incoming table: %s",
148+
"Can not create table because incoming table has no less tag columns than existing table, "
149+
+ "and the existing tag columns are not the prefix of the incoming tag columns. "
150+
+ "Existing tag column: %s, index in existing table: %s, index in incoming table: %s",
151151
tagName, indexReal, indexIncoming));
152152
}
153153
}
154154
} else {
155-
// When existing table has more ID columns, the incoming id columns
156-
// should be the prefix of the existing id columns
155+
// When existing table has more TAG columns, the incoming tag columns
156+
// should be the prefix of the existing tag columns
157157
for (int indexIncoming = 0; indexIncoming < incomingTagColumns.size(); indexIncoming++) {
158158
final String tagName = incomingTagColumns.get(indexIncoming).getName();
159159
final int indexReal = table.getTagColumnOrdinal(tagName);
160160
if (indexReal != indexIncoming) {
161161
throw new LoadAnalyzeTableColumnDisorderException(
162162
String.format(
163-
"Can not create table because existing table has more id columns than incoming table, "
164-
+ "and the incoming id columns are not the prefix of the existing id columns. "
165-
+ "Incoming id column: %s, index in existing table: %s, index in incoming table: %s",
163+
"Can not create table because existing table has more tag columns than incoming table, "
164+
+ "and the incoming tag columns are not the prefix of the existing tag columns. "
165+
+ "Incoming tag column: %s, index in existing table: %s, index in incoming table: %s",
166166
tagName, indexReal, indexIncoming));
167167
}
168168
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ public class CommonConfig {
209209
// Sequentially poll the tsFile by default
210210
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
211211
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
212+
private boolean skipFailedTableSchemaCheck = true;
212213

213214
/** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */
214215
private int pipeSubtaskExecutorMaxThreadNum =
@@ -1522,6 +1523,18 @@ public void setPipeRealTimeQueueMaxWaitingTsFileSize(int pipeRealTimeQueueMaxWai
15221523
pipeRealTimeQueueMaxWaitingTsFileSize);
15231524
}
15241525

1526+
public boolean isSkipFailedTableSchemaCheck() {
1527+
return skipFailedTableSchemaCheck;
1528+
}
1529+
1530+
public void setSkipFailedTableSchemaCheck(boolean skipFailedTableSchemaCheck) {
1531+
if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
1532+
return;
1533+
}
1534+
this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
1535+
logger.info("skipFailedTableSchemaCheck is set to {}.", skipFailedTableSchemaCheck);
1536+
}
1537+
15251538
public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
15261539
if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
15271540
return;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ public int getPipeRealTimeQueueMaxWaitingTsFileSize() {
119119
return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
120120
}
121121

122+
public boolean isSkipFailedTableSchemaCheck() {
123+
return COMMON_CONFIG.isSkipFailedTableSchemaCheck();
124+
}
125+
122126
/////////////////////////////// Subtask Executor ///////////////////////////////
123127

124128
public int getPipeSubtaskExecutorMaxThreadNum() {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
278278
properties.getProperty(
279279
"pipe_realTime_queue_max_waiting_tsFile_size",
280280
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
281+
config.setSkipFailedTableSchemaCheck(
282+
Boolean.parseBoolean(
283+
properties.getProperty(
284+
"skip_failed_table_schema_check",
285+
String.valueOf(config.isSkipFailedTableSchemaCheck()))));
281286
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
282287
Integer.parseInt(
283288
properties.getProperty(

0 commit comments

Comments
 (0)