Skip to content

Commit 07d457e

Browse files
authored
Fix union when cross region or need mapping from child input
1 parent aa560e7 commit 07d457e

File tree

9 files changed

+228
-10
lines changed

9 files changed

+228
-10
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.query.recent;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.TableClusterIT;
25+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
26+
27+
import org.junit.BeforeClass;
28+
import org.junit.experimental.categories.Category;
29+
import org.junit.runner.RunWith;
30+
31+
import static org.apache.iotdb.confignode.it.partition.IoTDBPartitionShuffleStrategyIT.SHUFFLE;
32+
import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
33+
34+
@RunWith(IoTDBTestRunner.class)
35+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
36+
public class IoTDBUnionTable2IT extends IoTDBUnionTableIT {
37+
38+
@BeforeClass
39+
public static void setUp() throws Exception {
40+
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(2);
41+
EnvFactory.getEnv().getConfig().getCommonConfig().setDataPartitionAllocationStrategy(SHUFFLE);
42+
EnvFactory.getEnv().initClusterEnvironment();
43+
prepareTableData(createSqls);
44+
}
45+
}

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,24 @@ public void normalTest() {
113113
DATABASE_NAME);
114114
}
115115

116+
@Test
117+
public void mappingTest() {
118+
String[] expectedHeader = new String[] {"col_a"};
119+
String[] retArray = new String[] {"1.0,", "2.0,", "3.0,"};
120+
tableResultSetEqualTest(
121+
"select col_a from ((select s1 as col_a, device as col_b from table1) union (select s2 as col_a, device as col_b from table2)) order by col_a",
122+
expectedHeader,
123+
retArray,
124+
DATABASE_NAME);
125+
126+
retArray = new String[] {"1.0,", "1.0,", "2.0,", "3.0,"};
127+
tableResultSetEqualTest(
128+
"select col_a from ((select s1 as col_a, device as col_b from table1) union all (select s2 as col_a, device as col_b from table2)) order by col_a",
129+
expectedHeader,
130+
retArray,
131+
DATABASE_NAME);
132+
}
133+
116134
@Test
117135
public void exceptionTest() {
118136
tableAssertTestFail(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ public class CollectOperator implements ProcessOperator {
3535
RamUsageEstimator.shallowSizeOfInstance(CollectOperator.class);
3636

3737
private final OperatorContext operatorContext;
38-
private final List<Operator> children;
38+
protected final List<Operator> children;
3939
private boolean inited = false;
4040

41-
private int currentIndex;
41+
protected int currentIndex;
4242

4343
public CollectOperator(OperatorContext operatorContext, List<Operator> children) {
4444
this.operatorContext = operatorContext;
@@ -62,7 +62,7 @@ public TsBlock next() throws Exception {
6262
}
6363
}
6464

65-
private void closeCurrentChild(int index) throws Exception {
65+
protected void closeCurrentChild(int index) throws Exception {
6666
children.get(index).close();
6767
children.set(index, null);
6868
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.operator.process;
21+
22+
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
23+
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
24+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
25+
26+
import org.apache.tsfile.block.column.Column;
27+
import org.apache.tsfile.read.common.block.TsBlock;
28+
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
29+
import org.apache.tsfile.utils.RamUsageEstimator;
30+
31+
import java.util.List;
32+
33+
public class MappingCollectOperator extends CollectOperator {
34+
protected static final long INSTANCE_SIZE =
35+
RamUsageEstimator.shallowSizeOfInstance(MappingCollectOperator.class);
36+
37+
// record mapping for each child
38+
private final List<List<Integer>> mappings;
39+
40+
public MappingCollectOperator(
41+
OperatorContext operatorContext, List<Operator> children, List<List<Integer>> mappings) {
42+
super(operatorContext, children);
43+
this.mappings = mappings;
44+
}
45+
46+
@Override
47+
public TsBlock next() throws Exception {
48+
if (children.get(currentIndex).hasNextWithTimer()) {
49+
TsBlock tsBlock = children.get(currentIndex).nextWithTimer();
50+
if (tsBlock == null) {
51+
return null;
52+
} else {
53+
Column[] columns = new Column[tsBlock.getValueColumnCount()];
54+
List<Integer> mapping = mappings.get(currentIndex);
55+
for (int i = 0; i < columns.length; i++) {
56+
columns[i] = tsBlock.getColumn(mapping.get(i));
57+
}
58+
return TsBlock.wrapBlocksWithoutCopy(
59+
tsBlock.getPositionCount(),
60+
new RunLengthEncodedColumn(
61+
TableScanOperator.TIME_COLUMN_TEMPLATE, tsBlock.getPositionCount()),
62+
columns);
63+
}
64+
} else {
65+
closeCurrentChild(currentIndex);
66+
currentIndex++;
67+
return null;
68+
}
69+
}
70+
71+
protected void closeCurrentChild(int index) throws Exception {
72+
children.get(index).close();
73+
children.set(index, null);
74+
mappings.set(index, null);
75+
}
76+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.iotdb.db.queryengine.execution.operator.process.EnforceSingleRowOperator;
5252
import org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
5353
import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
54+
import org.apache.iotdb.db.queryengine.execution.operator.process.MappingCollectOperator;
5455
import org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
5556
import org.apache.iotdb.db.queryengine.execution.operator.process.PatternRecognitionOperator;
5657
import org.apache.iotdb.db.queryengine.execution.operator.process.PreviousFillWithGroupOperator;
@@ -246,6 +247,7 @@
246247
import com.google.common.collect.ImmutableList;
247248
import com.google.common.collect.ImmutableMap;
248249
import com.google.common.collect.ImmutableSet;
250+
import com.google.common.collect.ListMultimap;
249251
import org.apache.tsfile.block.column.Column;
250252
import org.apache.tsfile.common.conf.TSFileConfig;
251253
import org.apache.tsfile.common.conf.TSFileDescriptor;
@@ -4140,7 +4142,21 @@ public Operator visitUnion(UnionNode node, LocalExecutionPlanContext context) {
41404142
.addOperatorContext(
41414143
context.getNextOperatorId(),
41424144
node.getPlanNodeId(),
4143-
CollectOperator.class.getSimpleName());
4144-
return new CollectOperator(operatorContext, children);
4145+
MappingCollectOperator.class.getSimpleName());
4146+
4147+
int size = children.size();
4148+
List<List<Integer>> mappings = new ArrayList<>(size);
4149+
List<Symbol> unionOutputs = node.getOutputSymbols();
4150+
ListMultimap<Symbol, Symbol> outputToInputs = node.getSymbolMapping();
4151+
for (int i = 0; i < size; i++) {
4152+
Map<Symbol, Integer> childOutputs =
4153+
makeLayoutFromOutputSymbols(node.getChildren().get(i).getOutputSymbols());
4154+
int finalI = i;
4155+
mappings.add(
4156+
unionOutputs.stream()
4157+
.map(symbol -> childOutputs.get(outputToInputs.get(symbol).get(finalI)))
4158+
.collect(Collectors.toList()));
4159+
}
4160+
return new MappingCollectOperator(operatorContext, children, mappings);
41454161
}
41464162
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@
101101
import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
102102

103103
import com.google.common.collect.ImmutableList;
104+
import com.google.common.collect.ImmutableListMultimap;
104105
import com.google.common.collect.ImmutableSet;
106+
import com.google.common.collect.ListMultimap;
105107
import org.apache.tsfile.common.conf.TSFileConfig;
106108
import org.apache.tsfile.file.metadata.IDeviceID;
107109
import org.apache.tsfile.read.common.type.LongType;
@@ -111,6 +113,7 @@
111113
import javax.annotation.Nonnull;
112114

113115
import java.util.ArrayList;
116+
import java.util.Collection;
114117
import java.util.Collections;
115118
import java.util.Comparator;
116119
import java.util.HashMap;
@@ -1779,7 +1782,36 @@ public List<PlanNode> visitWindowFunction(WindowNode node, PlanContext context)
17791782
@Override
17801783
public List<PlanNode> visitUnion(UnionNode node, PlanContext context) {
17811784
context.clearExpectedOrderingScheme();
1782-
return visitMultiChildProcess(node, context);
1785+
List<List<PlanNode>> children =
1786+
node.getChildren().stream()
1787+
.map(child -> child.accept(this, context))
1788+
.collect(toImmutableList());
1789+
1790+
List<PlanNode> newUnionChildren =
1791+
children.stream().flatMap(Collection::stream).collect(toImmutableList());
1792+
1793+
// after rewrite, we need to reconstruct SymbolMapping
1794+
ListMultimap<Symbol, Symbol> oldSymbolMapping = node.getSymbolMapping();
1795+
ImmutableListMultimap.Builder<Symbol, Symbol> newSymbolMapping =
1796+
ImmutableListMultimap.builder();
1797+
for (Symbol symbol : oldSymbolMapping.keySet()) {
1798+
List<Symbol> oldSymbols = oldSymbolMapping.get(symbol);
1799+
for (int i = 0; i < oldSymbols.size(); i++) {
1800+
Symbol target = oldSymbols.get(i);
1801+
int duplicateSize = children.get(i).size();
1802+
// add the same Symbol for all children spilt from one original node
1803+
while (duplicateSize > 0) {
1804+
newSymbolMapping.put(symbol, target);
1805+
duplicateSize--;
1806+
}
1807+
}
1808+
}
1809+
return Collections.singletonList(
1810+
new UnionNode(
1811+
node.getPlanNodeId(),
1812+
newUnionChildren,
1813+
newSymbolMapping.build(),
1814+
node.getOutputSymbols()));
17831815
}
17841816

17851817
public static class PlanContext {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@
4646
import static java.util.Objects.requireNonNull;
4747

4848
public abstract class SetOperationNode extends MultiChildProcessNode {
49-
// Corresponding is not supported in UNION now, this field can be used for future expansion.
50-
// We don't need to serialize this field now, consider it when support Corresponding.
51-
private final transient ListMultimap<Symbol, Symbol> outputToInputs;
49+
private final ListMultimap<Symbol, Symbol> outputToInputs;
5250
private final List<Symbol> outputs;
5351

5452
protected SetOperationNode(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ protected void serializeAttributes(ByteBuffer byteBuffer) {
6969
PlanNodeType.TABLE_UNION_NODE.serialize(byteBuffer);
7070
ReadWriteIOUtils.write(getOutputSymbols().size(), byteBuffer);
7171
getOutputSymbols().forEach(symbol -> Symbol.serialize(symbol, byteBuffer));
72+
73+
ListMultimap<Symbol, Symbol> multimap = getSymbolMapping();
74+
ReadWriteIOUtils.write(multimap.size(), byteBuffer);
75+
for (Symbol key : multimap.keySet()) {
76+
Symbol.serialize(key, byteBuffer);
77+
ReadWriteIOUtils.write(multimap.get(key).size(), byteBuffer);
78+
for (Symbol value : multimap.get(key)) {
79+
Symbol.serialize(value, byteBuffer);
80+
}
81+
}
7282
}
7383

7484
@Override
@@ -78,6 +88,16 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
7888
for (Symbol symbol : getOutputSymbols()) {
7989
Symbol.serialize(symbol, stream);
8090
}
91+
92+
ListMultimap<Symbol, Symbol> multimap = getSymbolMapping();
93+
ReadWriteIOUtils.write(multimap.keySet().size(), stream);
94+
for (Symbol key : multimap.keySet()) {
95+
Symbol.serialize(key, stream);
96+
ReadWriteIOUtils.write(multimap.get(key).size(), stream);
97+
for (Symbol value : multimap.get(key)) {
98+
Symbol.serialize(value, stream);
99+
}
100+
}
81101
}
82102

83103
public static UnionNode deserialize(ByteBuffer byteBuffer) {
@@ -86,8 +106,17 @@ public static UnionNode deserialize(ByteBuffer byteBuffer) {
86106
while (size-- > 0) {
87107
outputs.add(Symbol.deserialize(byteBuffer));
88108
}
109+
ImmutableListMultimap.Builder<Symbol, Symbol> builder = ImmutableListMultimap.builder();
110+
size = ReadWriteIOUtils.readInt(byteBuffer);
111+
while (size-- > 0) {
112+
Symbol key = Symbol.deserialize(byteBuffer);
113+
int valueSize = ReadWriteIOUtils.readInt(byteBuffer);
114+
for (int i = 0; i < valueSize; i++) {
115+
builder.put(key, Symbol.deserialize(byteBuffer));
116+
}
117+
}
89118
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
90-
return new UnionNode(planNodeId, ImmutableListMultimap.of(), outputs);
119+
return new UnionNode(planNodeId, builder.build(), outputs);
91120
}
92121

93122
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ public class CompatibleResolver {
9595
addCondition(UNKNOWN, DOUBLE, DOUBLE);
9696
addCondition(UNKNOWN, DATE, DATE);
9797
addCondition(UNKNOWN, TIMESTAMP, TIMESTAMP);
98+
addCondition(UNKNOWN, BOOLEAN, BOOLEAN);
99+
addCondition(UNKNOWN, TEXT, TEXT);
100+
addCondition(UNKNOWN, STRING, STRING);
101+
addCondition(UNKNOWN, BLOB, BLOB);
98102
}
99103

100104
private static void addCondition(Type condition1, Type condition2, Type result) {

0 commit comments

Comments
 (0)