Skip to content

Commit eae753f

Browse files
authored
Fix IntoOperator order bug
1 parent ef0e45e commit eae753f

File tree

7 files changed

+110
-43
lines changed

7 files changed

+110
-43
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/insertquery/IoTDBInsertQueryIT.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Locale;
5050

5151
import static org.junit.Assert.assertEquals;
52+
import static org.junit.Assert.assertFalse;
5253
import static org.junit.Assert.assertNotNull;
5354
import static org.junit.Assert.assertNull;
5455
import static org.junit.Assert.fail;
@@ -520,6 +521,38 @@ public void testQueryStatement() throws IoTDBConnectionException, StatementExecu
520521
}
521522
} finally {
522523
session.executeNonQueryStatement(String.format(dropTableTemplate, 1));
524+
session.close();
525+
}
526+
}
527+
528+
@Test
529+
public void testDeviceTagLast() {
530+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
531+
session.executeNonQueryStatement("use test");
532+
session.executeNonQueryStatement(
533+
"create table source(s1 boolean field, device_id string tag)");
534+
session.executeNonQueryStatement(
535+
"insert into source(time, device_id, s1) values(1, 'd1', false)");
536+
session.executeNonQueryStatement(
537+
"create table target(device_id string tag, s1 boolean field)");
538+
539+
session.executeNonQueryStatement(
540+
"INSERT INTO target(time, device_id, s1) SELECT time, device_id, s1 FROM source");
541+
try (SessionDataSet dataSet =
542+
session.executeQueryStatement("SELECT time, device_id, s1 FROM target")) {
543+
assertEquals(3, dataSet.getColumnNames().size());
544+
SessionDataSet.DataIterator iterator = dataSet.iterator();
545+
int count = 0;
546+
while (iterator.next()) {
547+
count++;
548+
assertEquals(1, iterator.getInt(1));
549+
assertEquals("d1", iterator.getString(2));
550+
assertFalse(iterator.getBoolean(3));
551+
}
552+
assertEquals(1, count);
553+
}
554+
} catch (Exception e) {
555+
fail(e.getMessage());
523556
}
524557
}
525558

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3697,7 +3697,22 @@ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
36973697
List<TSDataType> inputColumnTypes = new ArrayList<>();
36983698
List<TsTableColumnCategory> inputColumnCategories = new ArrayList<>();
36993699

3700-
List<ColumnSchema> inputColumns = node.getColumns();
3700+
List<ColumnSchema> originColumns = node.getColumns();
3701+
List<Symbol> originInputColumnNames = node.getNeededInputColumnNames();
3702+
int size = originColumns.size();
3703+
List<ColumnSchema> inputColumns = new ArrayList<>(size);
3704+
3705+
List<Symbol> childOutputName = node.getChild().getOutputSymbols();
3706+
Map<Symbol, Integer> map = new HashMap<>(childOutputName.size());
3707+
for (int i = 0; i < size; i++) {
3708+
map.put(childOutputName.get(i), i);
3709+
inputColumns.add(null);
3710+
}
3711+
for (int i = 0; i < size; i++) {
3712+
int index = map.get(originInputColumnNames.get(i));
3713+
inputColumns.set(index, originColumns.get(i));
3714+
}
3715+
37013716
for (int i = 0; i < inputColumns.size(); i++) {
37023717
String columnName = inputColumns.get(i).getName();
37033718
inputLocationMap.put(columnName, new InputLocation(0, i));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,9 @@
7171
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
7272
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
7373
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
74-
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
7574
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FetchDevice;
7675
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Insert;
7776
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
78-
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
7977
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
8078
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
8179
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
@@ -91,12 +89,10 @@
9189
import org.apache.tsfile.enums.TSDataType;
9290
import org.apache.tsfile.read.common.type.LongType;
9391
import org.apache.tsfile.read.common.type.StringType;
94-
import org.apache.tsfile.read.common.type.Type;
9592
import org.apache.tsfile.read.common.type.TypeFactory;
9693

9794
import java.util.ArrayList;
9895
import java.util.List;
99-
import java.util.Map;
10096
import java.util.Objects;
10197
import java.util.Optional;
10298

@@ -265,50 +261,28 @@ private RelationPlan genInsertPlan(final Analysis analysis, final Insert node) {
265261
TableMetadataImpl.throwTableNotExistsException(
266262
targetTable.getDatabaseName(), targetTable.getObjectName());
267263
}
268-
List<ColumnSchema> tableColumns = tableSchema.get().getColumns();
269-
Map<String, ColumnSchema> columnSchemaMap = tableSchema.get().getColumnSchemaMap();
270264

271265
// insert columns
272266
Analysis.Insert insert = analysis.getInsert();
273267
List<ColumnSchema> insertColumns = insert.getColumns();
274268

275-
// prepare Assignments and ColumnSchema builder
276269
Assignments.Builder assignments = Assignments.builder();
277-
ImmutableList.Builder<ColumnSchema> insertedColumnsBuilder = ImmutableList.builder();
278-
279-
// insert null if table column is not in query columns.
280-
for (ColumnSchema column : tableColumns) {
281-
if (column.isHidden()) {
282-
continue;
283-
}
284-
Symbol output = symbolAllocator.newSymbol(column.getName(), column.getType());
285-
Expression expression;
286-
Type tableType = column.getType();
287-
int index = insertColumns.indexOf(columnSchemaMap.get(column.getName()));
288-
if (index < 0) {
289-
expression = new NullLiteral();
290-
} else {
291-
Symbol input = visibleFieldMappings.get(index);
292-
Type queryType = symbolAllocator.getTypes().getTableModelType(input);
293-
if (!queryType.equals(tableType)) {
294-
throw new SemanticException(
295-
String.format(
296-
"Insert query has mismatched column type: Table: [%s], Query: [%s]",
297-
tableType, queryType));
298-
}
299-
expression = input.toSymbolReference();
300-
}
301-
assignments.put(output, expression);
302-
insertedColumnsBuilder.add(column);
270+
List<Symbol> neededInputColumnNames = new ArrayList<>(insertColumns.size());
271+
272+
for (int i = 0, size = insertColumns.size(); i < size; i++) {
273+
Symbol output =
274+
symbolAllocator.newSymbol(insertColumns.get(i).getName(), insertColumns.get(i).getType());
275+
Symbol input = visibleFieldMappings.get(i);
276+
neededInputColumnNames.add(output);
277+
assignments.put(output, input.toSymbolReference());
303278
}
304279

305280
// Project Node
306281
ProjectNode projectNode =
307282
new ProjectNode(
308283
queryContext.getQueryId().genPlanNodeId(), plan.getRoot(), assignments.build());
309-
List<ColumnSchema> insertedColumns = insertedColumnsBuilder.build();
310284
List<Field> fields =
311-
insertedColumns.stream()
285+
insertColumns.stream()
312286
.map(
313287
column ->
314288
Field.newUnqualified(
@@ -325,7 +299,8 @@ private RelationPlan genInsertPlan(final Analysis analysis, final Insert node) {
325299
plan.getRoot(),
326300
targetTable.getDatabaseName(),
327301
table.getName().getSuffix(),
328-
tableColumns,
302+
insertColumns,
303+
neededInputColumnNames,
329304
symbolAllocator.newSymbol(Insert.ROWS, Insert.ROWS_TYPE));
330305
return new RelationPlan(
331306
intoNode, analysis.getRootScope(), intoNode.getOutputSymbols(), Optional.empty());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ public List<PlanNode> visitInto(final IntoNode node, final PlanContext context)
228228
node.getDatabase(),
229229
node.getTable(),
230230
node.getColumns(),
231+
node.getNeededInputColumnNames(),
231232
node.getRowCountSymbol());
232233
resultNodeList.add(subIntoNode);
233234
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntoNode.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class IntoNode extends SingleChildProcessNode {
4444
private final String database;
4545
private final String table;
4646
private final List<ColumnSchema> columns;
47+
private final List<Symbol> neededInputColumnNames;
4748
private final Symbol rowCountSymbol;
4849

4950
public IntoNode(
@@ -52,11 +53,17 @@ public IntoNode(
5253
String database,
5354
String table,
5455
List<ColumnSchema> columns,
56+
List<Symbol> neededInputColumnNames,
5557
Symbol rowCountSymbol) {
5658
super(id, child);
5759
this.database = database;
5860
this.table = table;
5961
this.columns = columns;
62+
this.neededInputColumnNames = neededInputColumnNames;
63+
if (columns.size() != neededInputColumnNames.size()) {
64+
throw new IllegalArgumentException(
65+
"insert into table columns's size should be same as query result");
66+
}
6067
this.rowCountSymbol = rowCountSymbol;
6168
}
6269

@@ -67,7 +74,7 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
6774

6875
@Override
6976
public PlanNode clone() {
70-
return new IntoNode(id, null, database, table, columns, rowCountSymbol);
77+
return new IntoNode(id, null, database, table, columns, neededInputColumnNames, rowCountSymbol);
7178
}
7279

7380
@Override
@@ -90,6 +97,9 @@ protected void serializeAttributes(ByteBuffer byteBuffer) {
9097
for (ColumnSchema tableColumn : columns) {
9198
ColumnSchema.serialize(tableColumn, byteBuffer);
9299
}
100+
for (Symbol column : neededInputColumnNames) {
101+
Symbol.serialize(column, byteBuffer);
102+
}
93103
}
94104

95105
@Override
@@ -102,6 +112,9 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
102112
for (ColumnSchema tableColumn : columns) {
103113
ColumnSchema.serialize(tableColumn, stream);
104114
}
115+
for (Symbol column : neededInputColumnNames) {
116+
Symbol.serialize(column, stream);
117+
}
105118
}
106119

107120
public static IntoNode deserialize(ByteBuffer byteBuffer) {
@@ -113,14 +126,25 @@ public static IntoNode deserialize(ByteBuffer byteBuffer) {
113126
for (int i = 0; i < columnSize; i++) {
114127
columns.add(ColumnSchema.deserialize(byteBuffer));
115128
}
129+
List<Symbol> neededInputColumnNames = new ArrayList<>(columnSize);
130+
for (int i = 0; i < columnSize; i++) {
131+
neededInputColumnNames.add(Symbol.deserialize(byteBuffer));
132+
}
116133
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
117-
return new IntoNode(planNodeId, null, database, table, columns, rowCountSymbol);
134+
return new IntoNode(
135+
planNodeId, null, database, table, columns, neededInputColumnNames, rowCountSymbol);
118136
}
119137

120138
@Override
121139
public PlanNode replaceChildren(List<PlanNode> newChildren) {
122140
return new IntoNode(
123-
id, Iterables.getOnlyElement(newChildren), database, table, columns, rowCountSymbol);
141+
id,
142+
Iterables.getOnlyElement(newChildren),
143+
database,
144+
table,
145+
columns,
146+
neededInputColumnNames,
147+
rowCountSymbol);
124148
}
125149

126150
@Override
@@ -143,12 +167,14 @@ public boolean equals(Object o) {
143167
return database.equals(that.database)
144168
&& table.equals(that.table)
145169
&& rowCountSymbol.equals(that.rowCountSymbol)
146-
&& Objects.deepEquals(columns, that.columns);
170+
&& Objects.deepEquals(columns, that.columns)
171+
&& Objects.deepEquals(neededInputColumnNames, that.neededInputColumnNames);
147172
}
148173

149174
@Override
150175
public int hashCode() {
151-
return Objects.hash(super.hashCode(), database, table, rowCountSymbol, columns);
176+
return Objects.hash(
177+
super.hashCode(), database, table, rowCountSymbol, columns, neededInputColumnNames);
152178
}
153179

154180
public List<Type> getOutputType() {
@@ -167,6 +193,10 @@ public List<ColumnSchema> getColumns() {
167193
return columns;
168194
}
169195

196+
public List<Symbol> getNeededInputColumnNames() {
197+
return neededInputColumnNames;
198+
}
199+
170200
public Symbol getRowCountSymbol() {
171201
return rowCountSymbol;
172202
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,9 +411,18 @@ public PlanAndMappings visitLimit(LimitNode node, UnaliasContext context) {
411411
@Override
412412
public PlanAndMappings visitInto(IntoNode node, UnaliasContext context) {
413413
PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
414+
Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings());
415+
SymbolMapper mapper = symbolMapper(mapping);
414416

415417
return new PlanAndMappings(
416-
node.replaceChildren(ImmutableList.of(rewrittenSource.getRoot())),
418+
new IntoNode(
419+
node.getPlanNodeId(),
420+
rewrittenSource.root,
421+
node.getDatabase(),
422+
node.getTable(),
423+
node.getColumns(),
424+
mapper.map(node.getNeededInputColumnNames()),
425+
node.getRowCountSymbol()),
417426
rewrittenSource.getMappings());
418427
}
419428

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/IntoNodeSerdeTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,16 @@ public void testTableIntoSerde() throws IllegalPathException, IOException {
110110
new ColumnSchema("id", StringType.STRING, false, TsTableColumnCategory.TAG),
111111
new ColumnSchema("voltage", FloatType.FLOAT, false, TsTableColumnCategory.FIELD));
112112

113+
List<Symbol> neededInputColumnNames =
114+
ImmutableList.of(new Symbol("time"), new Symbol("voltage"), new Symbol("id"));
115+
113116
return new org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode(
114117
new PlanNodeId("TestIntoNode"),
115118
null,
116119
"testdb",
117120
"testtb",
118121
sourceColumns,
122+
neededInputColumnNames,
119123
new Symbol("rows"));
120124
}
121125
}

0 commit comments

Comments
 (0)