Skip to content

Commit ad09bd3

Browse files
Pipe: Support table statements' data type conversion on receiver type mismatch (apache#14354)
1 parent 761c67e commit ad09bd3

File tree

9 files changed

+314
-24
lines changed

9 files changed

+314
-24
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@
6161
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
6262
import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics;
6363
import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
64-
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor;
6564
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
6665
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
6766
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
6867
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
68+
import org.apache.iotdb.db.pipe.receiver.visitor.PipeTableStatementDataTypeConvertExecutionVisitor;
69+
import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementDataTypeConvertExecutionVisitor;
6970
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
7071
import org.apache.iotdb.db.protocol.session.IClientSession;
7172
import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -144,9 +145,13 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
144145
new PipeStatementExceptionVisitor();
145146
private static final PipeStatementPatternParseVisitor STATEMENT_PATTERN_PARSE_VISITOR =
146147
new PipeStatementPatternParseVisitor();
147-
private final PipeStatementDataTypeConvertExecutionVisitor
148-
statementDataTypeConvertExecutionVisitor =
149-
new PipeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel);
148+
private final PipeTableStatementDataTypeConvertExecutionVisitor
149+
tableStatementDataTypeConvertExecutionVisitor =
150+
new PipeTableStatementDataTypeConvertExecutionVisitor(
151+
this::executeStatementForTableModel);
152+
private final PipeTreeStatementDataTypeConvertExecutionVisitor
153+
treeStatementDataTypeConvertExecutionVisitor =
154+
new PipeTreeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel);
150155
private final PipeStatementToBatchVisitor batchVisitor = new PipeStatementToBatchVisitor();
151156

152157
// Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster
@@ -529,6 +534,9 @@ private TSStatus loadTsFileAsync(final String dataBaseName, final List<String> a
529534
}
530535

531536
final String loadActiveListeningPipeDir = IOTDB_CONFIG.getLoadActiveListeningPipeDir();
537+
if (Objects.isNull(loadActiveListeningPipeDir)) {
538+
throw new PipeException("Load active listening pipe dir is not set.");
539+
}
532540

533541
for (final String absolutePath : absolutePaths) {
534542
if (absolutePath == null) {
@@ -782,18 +790,18 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
782790
? executeStatementForTableModel(statement, dataBaseName)
783791
: executeStatementForTreeModel(statement);
784792

785-
// The following code is used to handle the data type mismatch exception
786-
// Data type conversion is not supported for table model statements
787-
if (isTableModelStatement) {
788-
return status;
789-
}
790793
// Try to convert data type if the statement is a tree model statement
791794
// and the status code is not success
792795
return shouldConvertDataTypeOnTypeMismatch
793796
&& ((statement instanceof InsertBaseStatement
794797
&& ((InsertBaseStatement) statement).hasFailedMeasurements())
795798
|| status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
796-
? statement.accept(statementDataTypeConvertExecutionVisitor, status).orElse(status)
799+
? (isTableModelStatement
800+
? statement
801+
.accept(
802+
tableStatementDataTypeConvertExecutionVisitor, new Pair<>(status, dataBaseName))
803+
.orElse(status)
804+
: statement.accept(treeStatementDataTypeConvertExecutionVisitor, status).orElse(status))
797805
: status;
798806
}
799807

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,16 @@ public PipeConvertedInsertRowStatement(final InsertRowStatement insertRowStateme
4848
measurementSchemas = insertRowStatement.getMeasurementSchemas();
4949
measurements = insertRowStatement.getMeasurements();
5050
dataTypes = insertRowStatement.getDataTypes();
51+
columnCategories = insertRowStatement.getColumnCategories();
52+
idColumnIndices = insertRowStatement.getIdColumnIndices();
53+
attrColumnIndices = insertRowStatement.getAttrColumnIndices();
54+
writeToTable = insertRowStatement.isWriteToTable();
55+
databaseName = insertRowStatement.getDatabaseName().orElse(null);
5156
// InsertRowStatement
5257
time = insertRowStatement.getTime();
5358
values = insertRowStatement.getValues();
5459
isNeedInferType = insertRowStatement.isNeedInferType();
60+
deviceID = insertRowStatement.getRawTableDeviceID();
5561
}
5662

5763
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,17 @@ public PipeConvertedInsertTabletStatement(final InsertTabletStatement insertTabl
4242
measurementSchemas = insertTabletStatement.getMeasurementSchemas();
4343
measurements = insertTabletStatement.getMeasurements();
4444
dataTypes = insertTabletStatement.getDataTypes();
45+
columnCategories = insertTabletStatement.getColumnCategories();
46+
idColumnIndices = insertTabletStatement.getIdColumnIndices();
47+
attrColumnIndices = insertTabletStatement.getAttrColumnIndices();
48+
writeToTable = insertTabletStatement.isWriteToTable();
49+
databaseName = insertTabletStatement.getDatabaseName().orElse(null);
4550
// InsertTabletStatement
4651
times = insertTabletStatement.getTimes();
4752
nullBitMaps = insertTabletStatement.getBitMaps();
4853
columns = insertTabletStatement.getColumns();
54+
deviceIDs = insertTabletStatement.getRawTableDeviceIDs();
55+
singleDevice = insertTabletStatement.isSingleDevice();
4956
rowCount = insertTabletStatement.getRowCount();
5057
}
5158

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.receiver.visitor;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
24+
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
25+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
26+
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
27+
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
28+
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
29+
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
30+
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
31+
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
32+
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
33+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
34+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
35+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
36+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
37+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
38+
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
39+
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
40+
import org.apache.iotdb.rpc.TSStatusCode;
41+
42+
import org.apache.commons.io.FileUtils;
43+
import org.apache.tsfile.utils.Pair;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
46+
47+
import java.io.File;
48+
import java.util.Objects;
49+
import java.util.Optional;
50+
import java.util.stream.Collectors;
51+
52+
/**
53+
* This visitor transforms the data type of the statement when the statement is executed and an
54+
* exception occurs. The transformed statement (if any) is returned and will be executed again.
55+
*/
56+
public class PipeTableStatementDataTypeConvertExecutionVisitor
57+
extends StatementVisitor<Optional<TSStatus>, Pair<TSStatus, String>> {
58+
59+
private static final Logger LOGGER =
60+
LoggerFactory.getLogger(PipeTableStatementDataTypeConvertExecutionVisitor.class);
61+
62+
@FunctionalInterface
63+
public interface StatementExecutor {
64+
// databaseName can NOT be null
65+
TSStatus execute(final Statement statement, final String databaseName);
66+
}
67+
68+
private final StatementExecutor statementExecutor;
69+
70+
public PipeTableStatementDataTypeConvertExecutionVisitor(
71+
final StatementExecutor statementExecutor) {
72+
this.statementExecutor = statementExecutor;
73+
}
74+
75+
private Optional<TSStatus> tryExecute(final Statement statement, final String databaseName) {
76+
try {
77+
if (Objects.isNull(databaseName)) {
78+
LOGGER.warn(
79+
"Database name is unexpectedly null for statement: {}. Skip data type conversion.",
80+
statement);
81+
return Optional.empty();
82+
}
83+
84+
return Optional.of(statementExecutor.execute(statement, databaseName));
85+
} catch (final Exception e) {
86+
LOGGER.warn("Failed to execute statement after data type conversion.", e);
87+
return Optional.empty();
88+
}
89+
}
90+
91+
@Override
92+
public Optional<TSStatus> visitNode(
93+
final StatementNode statementNode, final Pair<TSStatus, String> statusAndDatabaseName) {
94+
return Optional.empty();
95+
}
96+
97+
@Override
98+
public Optional<TSStatus> visitLoadFile(
99+
final LoadTsFileStatement loadTsFileStatement,
100+
final Pair<TSStatus, String> statusAndDatabaseName) {
101+
final TSStatus status = statusAndDatabaseName.getLeft();
102+
final String databaseName = statusAndDatabaseName.getRight();
103+
104+
if (Objects.isNull(databaseName)) {
105+
LOGGER.warn(
106+
"Database name is unexpectedly null for LoadTsFileStatement: {}. Skip data type conversion.",
107+
loadTsFileStatement);
108+
return Optional.empty();
109+
}
110+
111+
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
112+
// Ignore the error if it is caused by insufficient memory
113+
|| (status.getMessage() != null && status.getMessage().contains("memory"))) {
114+
return Optional.empty();
115+
}
116+
117+
LOGGER.warn(
118+
"Data type mismatch detected (TSStatus: {}) for LoadTsFileStatement: {}. Start data type conversion.",
119+
status,
120+
loadTsFileStatement);
121+
122+
for (final File file : loadTsFileStatement.getTsFiles()) {
123+
try (final TsFileInsertionEventTableParser parser =
124+
new TsFileInsertionEventTableParser(
125+
file,
126+
new TablePattern(true, null, null),
127+
Long.MIN_VALUE,
128+
Long.MAX_VALUE,
129+
null,
130+
null)) {
131+
for (final TabletInsertionEvent tabletInsertionEvent : parser.toTabletInsertionEvents()) {
132+
if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
133+
continue;
134+
}
135+
final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
136+
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
137+
138+
final PipeConvertedInsertTabletStatement statement =
139+
new PipeConvertedInsertTabletStatement(
140+
PipeTransferTabletRawReq.toTPipeTransferRawReq(
141+
rawTabletInsertionEvent.convertToTablet(),
142+
rawTabletInsertionEvent.isAligned())
143+
.constructStatement());
144+
145+
TSStatus result;
146+
try {
147+
result =
148+
IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement(
149+
statement, statementExecutor.execute(statement, databaseName));
150+
151+
// Retry max 5 times if the write process is rejected
152+
for (int i = 0;
153+
i < 5
154+
&& result.getCode()
155+
== TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION
156+
.getStatusCode();
157+
i++) {
158+
Thread.sleep(100L * (i + 1));
159+
result =
160+
IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement(
161+
statement, statementExecutor.execute(statement, databaseName));
162+
}
163+
} catch (final Exception e) {
164+
if (e instanceof InterruptedException) {
165+
Thread.currentThread().interrupt();
166+
}
167+
result = statement.accept(IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR, e);
168+
}
169+
170+
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
171+
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
172+
|| result.getCode()
173+
== TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
174+
return Optional.empty();
175+
}
176+
}
177+
} catch (final Exception e) {
178+
LOGGER.warn(
179+
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
180+
return Optional.empty();
181+
}
182+
}
183+
184+
if (loadTsFileStatement.isDeleteAfterLoad()) {
185+
loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
186+
}
187+
188+
LOGGER.warn(
189+
"Data type conversion for LoadTsFileStatement {} is successful.", loadTsFileStatement);
190+
191+
return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
192+
}
193+
194+
@Override
195+
public Optional<TSStatus> visitInsertRow(
196+
final InsertRowStatement insertRowStatement,
197+
final Pair<TSStatus, String> statusAndDatabaseName) {
198+
return tryExecute(
199+
new PipeConvertedInsertRowStatement(insertRowStatement), statusAndDatabaseName.getRight());
200+
}
201+
202+
@Override
203+
public Optional<TSStatus> visitInsertRows(
204+
final InsertRowsStatement insertRowsStatement,
205+
final Pair<TSStatus, String> statusAndDatabaseName) {
206+
if (insertRowsStatement.getInsertRowStatementList() == null
207+
|| insertRowsStatement.getInsertRowStatementList().isEmpty()) {
208+
return Optional.empty();
209+
}
210+
211+
final InsertRowsStatement convertedInsertRowsStatement = new InsertRowsStatement();
212+
convertedInsertRowsStatement.setInsertRowStatementList(
213+
insertRowsStatement.getInsertRowStatementList().stream()
214+
.map(PipeConvertedInsertRowStatement::new)
215+
.collect(Collectors.toList()));
216+
return tryExecute(convertedInsertRowsStatement, statusAndDatabaseName.getRight());
217+
}
218+
219+
@Override
220+
public Optional<TSStatus> visitInsertRowsOfOneDevice(
221+
final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement,
222+
final Pair<TSStatus, String> statusAndDatabaseName) {
223+
if (insertRowsOfOneDeviceStatement.getInsertRowStatementList() == null
224+
|| insertRowsOfOneDeviceStatement.getInsertRowStatementList().isEmpty()) {
225+
return Optional.empty();
226+
}
227+
228+
final InsertRowsOfOneDeviceStatement convertedInsertRowsOfOneDeviceStatement =
229+
new InsertRowsOfOneDeviceStatement();
230+
convertedInsertRowsOfOneDeviceStatement.setInsertRowStatementList(
231+
insertRowsOfOneDeviceStatement.getInsertRowStatementList().stream()
232+
.map(PipeConvertedInsertRowStatement::new)
233+
.collect(Collectors.toList()));
234+
return tryExecute(convertedInsertRowsOfOneDeviceStatement, statusAndDatabaseName.getRight());
235+
}
236+
237+
@Override
238+
public Optional<TSStatus> visitInsertTablet(
239+
final InsertTabletStatement insertTabletStatement,
240+
final Pair<TSStatus, String> statusAndDatabaseName) {
241+
return tryExecute(
242+
new PipeConvertedInsertTabletStatement(insertTabletStatement),
243+
statusAndDatabaseName.getRight());
244+
}
245+
246+
@Override
247+
public Optional<TSStatus> visitInsertMultiTablets(
248+
final InsertMultiTabletsStatement insertMultiTabletsStatement,
249+
final Pair<TSStatus, String> statusAndDatabaseName) {
250+
if (insertMultiTabletsStatement.getInsertTabletStatementList() == null
251+
|| insertMultiTabletsStatement.getInsertTabletStatementList().isEmpty()) {
252+
return Optional.empty();
253+
}
254+
255+
final InsertMultiTabletsStatement convertedInsertMultiTabletsStatement =
256+
new InsertMultiTabletsStatement();
257+
convertedInsertMultiTabletsStatement.setInsertTabletStatementList(
258+
insertMultiTabletsStatement.getInsertTabletStatementList().stream()
259+
.map(PipeConvertedInsertTabletStatement::new)
260+
.collect(Collectors.toList()));
261+
return tryExecute(convertedInsertMultiTabletsStatement, statusAndDatabaseName.getRight());
262+
}
263+
}

0 commit comments

Comments
 (0)