Skip to content

Commit 764cedc

Browse files
authored
Pipe: Fixed the potentially missing database auto-create in receiver (apache#16529)
* fix * fix
1 parent dbdc4b4 commit 764cedc

File tree

2 files changed

+4
-10
lines changed

2 files changed

+4
-10
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
9898
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
9999
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
100+
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
100101
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
101102
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
102103
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -126,7 +127,6 @@
126127
import java.util.Objects;
127128
import java.util.Optional;
128129
import java.util.Set;
129-
import java.util.concurrent.ConcurrentHashMap;
130130
import java.util.concurrent.ExecutionException;
131131
import java.util.concurrent.atomic.AtomicLong;
132132
import java.util.concurrent.atomic.AtomicReference;
@@ -174,8 +174,6 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
174174

175175
private final PipeTransferSliceReqHandler sliceReqHandler = new PipeTransferSliceReqHandler();
176176

177-
private static final Set<String> ALREADY_CREATED_TABLE_MODEL_DATABASES =
178-
ConcurrentHashMap.newKeySet();
179177
private final SqlParser tableSqlParser = new SqlParser();
180178

181179
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
@@ -581,7 +579,7 @@ private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbso
581579
statement.setDeleteAfterLoad(true);
582580
statement.setConvertOnTypeMismatch(true);
583581
statement.setVerifySchema(validateTsFile.get());
584-
statement.setAutoCreateDatabase(false);
582+
statement.setAutoCreateDatabase(true);
585583
statement.setDatabase(dataBaseName);
586584

587585
return executeStatementAndClassifyExceptions(statement);
@@ -966,8 +964,6 @@ private TSStatus executeStatementForTableModel(
966964
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
967965
.status;
968966
} catch (final Exception e) {
969-
ALREADY_CREATED_TABLE_MODEL_DATABASES.remove(databaseName);
970-
971967
final Throwable rootCause = getRootCause(e);
972968
if (rootCause.getMessage() != null
973969
&& rootCause
@@ -997,7 +993,7 @@ private TSStatus executeStatementForTableModel(
997993
}
998994

999995
private void autoCreateDatabaseIfNecessary(final String database) {
1000-
if (ALREADY_CREATED_TABLE_MODEL_DATABASES.contains(database)
996+
if (DataNodeTableCache.getInstance().isDatabaseExist(database)
1001997
|| !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
1002998
return;
1003999
}
@@ -1026,8 +1022,6 @@ private void autoCreateDatabaseIfNecessary(final String database) {
10261022
}
10271023
throw new PipeException("Auto create database failed because: " + e.getMessage());
10281024
}
1029-
1030-
ALREADY_CREATED_TABLE_MODEL_DATABASES.add(database);
10311025
}
10321026

10331027
private TSStatus executeStatementForTreeModel(final Statement statement) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public class CommonConfig {
209209
// Sequentially poll the tsFile by default
210210
private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 1;
211211
private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
212-
private boolean skipFailedTableSchemaCheck = true;
212+
private boolean skipFailedTableSchemaCheck = false;
213213

214214
/** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */
215215
private int pipeSubtaskExecutorMaxThreadNum =

0 commit comments

Comments
 (0)