Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Locale;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -520,6 +521,38 @@ public void testQueryStatement() throws IoTDBConnectionException, StatementExecu
}
} finally {
session.executeNonQueryStatement(String.format(dropTableTemplate, 1));
session.close();
}
}

@Test
public void testDeviceTagLast() {
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
session.executeNonQueryStatement("use test");
session.executeNonQueryStatement(
"create table source(s1 boolean field, device_id string tag)");
session.executeNonQueryStatement(
"insert into source(time, device_id, s1) values(1, 'd1', false)");
session.executeNonQueryStatement(
"create table target(device_id string tag, s1 boolean field)");

session.executeNonQueryStatement(
"INSERT INTO target(time, device_id, s1) SELECT time, device_id, s1 FROM source");
try (SessionDataSet dataSet =
session.executeQueryStatement("SELECT time, device_id, s1 FROM target")) {
assertEquals(3, dataSet.getColumnNames().size());
SessionDataSet.DataIterator iterator = dataSet.iterator();
int count = 0;
while (iterator.next()) {
count++;
assertEquals(1, iterator.getInt(1));
assertEquals("d1", iterator.getString(2));
assertFalse(iterator.getBoolean(3));
}
assertEquals(1, count);
}
} catch (Exception e) {
fail(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3697,7 +3697,22 @@ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
List<TSDataType> inputColumnTypes = new ArrayList<>();
List<TsTableColumnCategory> inputColumnCategories = new ArrayList<>();

List<ColumnSchema> inputColumns = node.getColumns();
List<ColumnSchema> originColumns = node.getColumns();
List<Symbol> originInputColumnNames = node.getNeededInputColumnNames();
int size = originColumns.size();
List<ColumnSchema> inputColumns = new ArrayList<>(size);

List<Symbol> childOutputName = node.getChild().getOutputSymbols();
Map<Symbol, Integer> map = new HashMap<>(childOutputName.size());
for (int i = 0; i < size; i++) {
map.put(childOutputName.get(i), i);
inputColumns.add(null);
}
for (int i = 0; i < size; i++) {
int index = map.get(originInputColumnNames.get(i));
inputColumns.set(index, originColumns.get(i));
}

for (int i = 0; i < inputColumns.size(); i++) {
String columnName = inputColumns.get(i).getName();
inputLocationMap.put(columnName, new InputLocation(0, i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,9 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FetchDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Insert;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
Expand All @@ -91,12 +89,10 @@
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.LongType;
import org.apache.tsfile.read.common.type.StringType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -265,50 +261,28 @@ private RelationPlan genInsertPlan(final Analysis analysis, final Insert node) {
TableMetadataImpl.throwTableNotExistsException(
targetTable.getDatabaseName(), targetTable.getObjectName());
}
List<ColumnSchema> tableColumns = tableSchema.get().getColumns();
Map<String, ColumnSchema> columnSchemaMap = tableSchema.get().getColumnSchemaMap();

// insert columns
Analysis.Insert insert = analysis.getInsert();
List<ColumnSchema> insertColumns = insert.getColumns();

// prepare Assignments and ColumnSchema builder
Assignments.Builder assignments = Assignments.builder();
ImmutableList.Builder<ColumnSchema> insertedColumnsBuilder = ImmutableList.builder();

// insert null if table column is not in query columns.
for (ColumnSchema column : tableColumns) {
if (column.isHidden()) {
continue;
}
Symbol output = symbolAllocator.newSymbol(column.getName(), column.getType());
Expression expression;
Type tableType = column.getType();
int index = insertColumns.indexOf(columnSchemaMap.get(column.getName()));
if (index < 0) {
expression = new NullLiteral();
} else {
Symbol input = visibleFieldMappings.get(index);
Type queryType = symbolAllocator.getTypes().getTableModelType(input);
if (!queryType.equals(tableType)) {
throw new SemanticException(
String.format(
"Insert query has mismatched column type: Table: [%s], Query: [%s]",
tableType, queryType));
}
expression = input.toSymbolReference();
}
assignments.put(output, expression);
insertedColumnsBuilder.add(column);
List<Symbol> neededInputColumnNames = new ArrayList<>(insertColumns.size());

for (int i = 0, size = insertColumns.size(); i < size; i++) {
Symbol output =
symbolAllocator.newSymbol(insertColumns.get(i).getName(), insertColumns.get(i).getType());
Symbol input = visibleFieldMappings.get(i);
neededInputColumnNames.add(output);
assignments.put(output, input.toSymbolReference());
}

// Project Node
ProjectNode projectNode =
new ProjectNode(
queryContext.getQueryId().genPlanNodeId(), plan.getRoot(), assignments.build());
List<ColumnSchema> insertedColumns = insertedColumnsBuilder.build();
List<Field> fields =
insertedColumns.stream()
insertColumns.stream()
.map(
column ->
Field.newUnqualified(
Expand All @@ -325,7 +299,8 @@ private RelationPlan genInsertPlan(final Analysis analysis, final Insert node) {
plan.getRoot(),
targetTable.getDatabaseName(),
table.getName().getSuffix(),
tableColumns,
insertColumns,
neededInputColumnNames,
symbolAllocator.newSymbol(Insert.ROWS, Insert.ROWS_TYPE));
return new RelationPlan(
intoNode, analysis.getRootScope(), intoNode.getOutputSymbols(), Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public List<PlanNode> visitInto(final IntoNode node, final PlanContext context)
node.getDatabase(),
node.getTable(),
node.getColumns(),
node.getNeededInputColumnNames(),
node.getRowCountSymbol());
resultNodeList.add(subIntoNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class IntoNode extends SingleChildProcessNode {
private final String database;
private final String table;
private final List<ColumnSchema> columns;
private final List<Symbol> neededInputColumnNames;
private final Symbol rowCountSymbol;

public IntoNode(
Expand All @@ -52,11 +53,17 @@ public IntoNode(
String database,
String table,
List<ColumnSchema> columns,
List<Symbol> neededInputColumnNames,
Symbol rowCountSymbol) {
super(id, child);
this.database = database;
this.table = table;
this.columns = columns;
this.neededInputColumnNames = neededInputColumnNames;
if (columns.size() != neededInputColumnNames.size()) {
throw new IllegalArgumentException(
"insert into table columns's size should be same as query result");
}
this.rowCountSymbol = rowCountSymbol;
}

Expand All @@ -67,7 +74,7 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {

@Override
public PlanNode clone() {
return new IntoNode(id, null, database, table, columns, rowCountSymbol);
return new IntoNode(id, null, database, table, columns, neededInputColumnNames, rowCountSymbol);
}

@Override
Expand All @@ -90,6 +97,9 @@ protected void serializeAttributes(ByteBuffer byteBuffer) {
for (ColumnSchema tableColumn : columns) {
ColumnSchema.serialize(tableColumn, byteBuffer);
}
for (Symbol column : neededInputColumnNames) {
Symbol.serialize(column, byteBuffer);
}
}

@Override
Expand All @@ -102,6 +112,9 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
for (ColumnSchema tableColumn : columns) {
ColumnSchema.serialize(tableColumn, stream);
}
for (Symbol column : neededInputColumnNames) {
Symbol.serialize(column, stream);
}
}

public static IntoNode deserialize(ByteBuffer byteBuffer) {
Expand All @@ -113,14 +126,25 @@ public static IntoNode deserialize(ByteBuffer byteBuffer) {
for (int i = 0; i < columnSize; i++) {
columns.add(ColumnSchema.deserialize(byteBuffer));
}
List<Symbol> neededInputColumnNames = new ArrayList<>(columnSize);
for (int i = 0; i < columnSize; i++) {
neededInputColumnNames.add(Symbol.deserialize(byteBuffer));
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new IntoNode(planNodeId, null, database, table, columns, rowCountSymbol);
return new IntoNode(
planNodeId, null, database, table, columns, neededInputColumnNames, rowCountSymbol);
}

@Override
public PlanNode replaceChildren(List<PlanNode> newChildren) {
return new IntoNode(
id, Iterables.getOnlyElement(newChildren), database, table, columns, rowCountSymbol);
id,
Iterables.getOnlyElement(newChildren),
database,
table,
columns,
neededInputColumnNames,
rowCountSymbol);
}

@Override
Expand All @@ -143,12 +167,14 @@ public boolean equals(Object o) {
return database.equals(that.database)
&& table.equals(that.table)
&& rowCountSymbol.equals(that.rowCountSymbol)
&& Objects.deepEquals(columns, that.columns);
&& Objects.deepEquals(columns, that.columns)
&& Objects.deepEquals(neededInputColumnNames, that.neededInputColumnNames);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), database, table, rowCountSymbol, columns);
return Objects.hash(
super.hashCode(), database, table, rowCountSymbol, columns, neededInputColumnNames);
}

public List<Type> getOutputType() {
Expand All @@ -167,6 +193,10 @@ public List<ColumnSchema> getColumns() {
return columns;
}

public List<Symbol> getNeededInputColumnNames() {
return neededInputColumnNames;
}

public Symbol getRowCountSymbol() {
return rowCountSymbol;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,18 @@ public PlanAndMappings visitLimit(LimitNode node, UnaliasContext context) {
@Override
public PlanAndMappings visitInto(IntoNode node, UnaliasContext context) {
PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings());
SymbolMapper mapper = symbolMapper(mapping);

return new PlanAndMappings(
node.replaceChildren(ImmutableList.of(rewrittenSource.getRoot())),
new IntoNode(
node.getPlanNodeId(),
rewrittenSource.root,
node.getDatabase(),
node.getTable(),
node.getColumns(),
mapper.map(node.getNeededInputColumnNames()),
node.getRowCountSymbol()),
rewrittenSource.getMappings());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,16 @@ public void testTableIntoSerde() throws IllegalPathException, IOException {
new ColumnSchema("id", StringType.STRING, false, TsTableColumnCategory.TAG),
new ColumnSchema("voltage", FloatType.FLOAT, false, TsTableColumnCategory.FIELD));

List<Symbol> neededInputColumnNames =
ImmutableList.of(new Symbol("time"), new Symbol("voltage"), new Symbol("id"));

return new org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode(
new PlanNodeId("TestIntoNode"),
null,
"testdb",
"testtb",
sourceColumns,
neededInputColumnNames,
new Symbol("rows"));
}
}
Loading