Skip to content

Commit f372e51

Browse files
authored
[To dev/1.3] Improve DeviceViewIntoOperator's return style to pipeline (#16980)
1 parent edca502 commit f372e51

File tree

11 files changed

+175
-125
lines changed

11 files changed

+175
-125
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -623,89 +623,89 @@ public void testDataTypeIncompatible() {
623623
// test INT32
624624
assertTestFail(
625625
"select s_int32 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
626-
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32]).");
626+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT32, timestamp 0, value 0]");
627627
assertTestFail(
628628
"select s_int32 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
629-
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int32[INT32]).");
629+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT32, timestamp 0, value 0]");
630630

631631
// test INT64
632632
assertTestFail(
633633
"select s_int64 into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
634-
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
634+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type INT64, timestamp 0, value 0]");
635635
assertTestFail(
636636
"select s_int64 into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
637-
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
637+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type INT64, timestamp 0, value 0]");
638638
assertTestFail(
639639
"select s_int64 into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
640-
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
640+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type INT64, timestamp 0, value 0]");
641641
assertTestFail(
642642
"select s_int64 into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
643-
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_int64[INT64]).");
643+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type INT64, timestamp 0, value 0]");
644644

645645
// test FLOAT
646646
assertTestFail(
647647
"select s_float into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
648-
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
648+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type FLOAT, timestamp 0, value 0.0]");
649649
assertTestFail(
650650
"select s_float into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
651-
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
651+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type FLOAT, timestamp 0, value 0.0]");
652652
assertTestFail(
653653
"select s_float into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
654-
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
654+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type FLOAT, timestamp 0, value 0.0]");
655655
assertTestFail(
656656
"select s_float into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
657-
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_float[FLOAT]).");
657+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type FLOAT, timestamp 0, value 0.0]");
658658

659659
// test DOUBLE
660660
assertTestFail(
661661
"select s_double into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
662-
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
662+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type DOUBLE, timestamp 0, value 0.0]");
663663
assertTestFail(
664664
"select s_double into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
665-
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
665+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type DOUBLE, timestamp 0, value 0.0]");
666666
assertTestFail(
667667
"select s_double into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
668-
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
668+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type DOUBLE, timestamp 0, value 0.0]");
669669
assertTestFail(
670670
"select s_double into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
671-
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
671+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type DOUBLE, timestamp 0, value 0.0]");
672672
assertTestFail(
673673
"select s_double into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
674-
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_double[DOUBLE]).");
674+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type DOUBLE, timestamp 0, value 0.0]");
675675

676676
// test BOOLEAN
677677
assertTestFail(
678678
"select s_boolean into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
679-
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
679+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type BOOLEAN, timestamp 0, value true]");
680680
assertTestFail(
681681
"select s_boolean into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
682-
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
682+
"301: Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type BOOLEAN, timestamp 0, value true]");
683683
assertTestFail(
684684
"select s_boolean into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
685-
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
685+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type BOOLEAN, timestamp 0, value true]");
686686
assertTestFail(
687687
"select s_boolean into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
688-
"The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
688+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type BOOLEAN, timestamp 0, value true]");
689689
assertTestFail(
690690
"select s_boolean into root.sg_type.d_1(s_text) from root.sg_type.d_0;",
691-
"The data type of target path (root.sg_type.d_1.s_text[TEXT]) is not compatible with the data type of source column (root.sg_type.d_0.s_boolean[BOOLEAN]).");
691+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_text] caused by [data type of root.sg_type.d_1.s_text is not consistent, registered type TEXT, inserting type BOOLEAN, timestamp 0, value true]");
692692

693693
// test TEXT
694694
assertTestFail(
695695
"select s_text into root.sg_type.d_1(s_int32) from root.sg_type.d_0;",
696-
"The data type of target path (root.sg_type.d_1.s_int32[INT32]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
696+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int32] caused by [data type of root.sg_type.d_1.s_int32 is not consistent, registered type INT32, inserting type TEXT, timestamp 0, value text0]");
697697
assertTestFail(
698698
"select s_text into root.sg_type.d_1(s_int64) from root.sg_type.d_0;",
699-
"The data type of target path (root.sg_type.d_1.s_int64[INT64]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
699+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_int64] caused by [data type of root.sg_type.d_1.s_int64 is not consistent, registered type INT64, inserting type TEXT, timestamp 0, value text0]");
700700
assertTestFail(
701701
"select s_text into root.sg_type.d_1(s_float) from root.sg_type.d_0;",
702-
"The data type of target path (root.sg_type.d_1.s_float[FLOAT]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
702+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_float] caused by [data type of root.sg_type.d_1.s_float is not consistent, registered type FLOAT, inserting type TEXT, timestamp 0, value text0]");
703703
assertTestFail(
704704
"select s_text into root.sg_type.d_1(s_double) from root.sg_type.d_0;",
705-
"The data type of target path (root.sg_type.d_1.s_double[DOUBLE]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
705+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_double] caused by [data type of root.sg_type.d_1.s_double is not consistent, registered type DOUBLE, inserting type TEXT, timestamp 0, value text0]");
706706
assertTestFail(
707707
"select s_text into root.sg_type.d_1(s_boolean) from root.sg_type.d_0;",
708-
"The data type of target path (root.sg_type.d_1.s_boolean[BOOLEAN]) is not compatible with the data type of source column (root.sg_type.d_0.s_text[TEXT]).");
708+
"Error occurred while inserting tablets in SELECT INTO: Fail to insert measurements [s_boolean] caused by [data type of root.sg_type.d_1.s_boolean is not consistent, registered type BOOLEAN, inserting type TEXT, timestamp 0, value text0]");
709709
}
710710

711711
@Test

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.tsfile.common.conf.TSFileDescriptor;
3939
import org.apache.tsfile.enums.TSDataType;
4040
import org.apache.tsfile.read.common.block.TsBlock;
41+
import org.apache.tsfile.read.common.block.TsBlockBuilder;
4142
import org.apache.tsfile.read.common.type.Type;
4243
import org.apache.tsfile.read.common.type.TypeFactory;
4344
import org.apache.tsfile.utils.Binary;
@@ -82,18 +83,22 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
8283
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
8384
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
8485

86+
protected final TsBlockBuilder resultTsBlockBuilder;
87+
8588
protected AbstractIntoOperator(
8689
OperatorContext operatorContext,
8790
Operator child,
8891
List<TSDataType> inputColumnTypes,
8992
ExecutorService intoOperationExecutor,
90-
long statementSizePerLine) {
93+
long statementSizePerLine,
94+
List<TSDataType> outputDataTypes) {
9195
this.operatorContext = operatorContext;
9296
this.child = child;
9397
this.typeConvertors =
9498
inputColumnTypes.stream().map(TypeFactory::getType).collect(Collectors.toList());
9599

96100
this.writeOperationExecutor = intoOperationExecutor;
101+
this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
97102
initMemoryEstimates(statementSizePerLine);
98103
}
99104

@@ -152,15 +157,15 @@ public TsBlock next() throws Exception {
152157
checkLastWriteOperation();
153158

154159
if (!processTsBlock(cachedTsBlock)) {
155-
return null;
160+
return tryToReturnPartialResult();
156161
}
157162
cachedTsBlock = null;
158163
if (child.hasNextWithTimer()) {
159164
TsBlock inputTsBlock = child.nextWithTimer();
160165
processTsBlock(inputTsBlock);
161166

162167
// call child.next only once
163-
return null;
168+
return tryToReturnPartialResult();
164169
} else {
165170
return tryToReturnResultTsBlock();
166171
}
@@ -218,6 +223,8 @@ private void checkLastWriteOperation() {
218223

219224
protected abstract TsBlock tryToReturnResultTsBlock();
220225

226+
protected abstract TsBlock tryToReturnPartialResult();
227+
221228
protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
222229
Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
223230
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
@@ -286,7 +293,7 @@ protected void executeInsertMultiTabletsStatement(
286293
() -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor);
287294
}
288295

289-
private boolean existFullStatement(
296+
protected boolean existFullStatement(
290297
List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
291298
for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
292299
if (generator.isFull()) {
@@ -549,6 +556,10 @@ public String getDevice() {
549556
return devicePath.toString();
550557
}
551558

559+
public int getRowCount() {
560+
return rowCount;
561+
}
562+
552563
public int getWrittenCount(String measurement) {
553564
if (!writtenCounter.containsKey(measurement)) {
554565
return -1;

0 commit comments

Comments
 (0)