Skip to content

Commit d6ef8cb

Browse files
committed
fix LastQueryScanNode sort
1 parent 8e99486 commit d6ef8cb

File tree

3 files changed

+72
-10
lines changed

3 files changed

+72
-10
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,15 +1086,16 @@ private void addSortForEachLastQueryNode(PlanNode root, Ordering timeseriesOrder
10861086
if (child instanceof LastQueryScanNode) {
10871087
// sort the measurements for LastQueryMergeOperator
10881088
LastQueryScanNode node = (LastQueryScanNode) child;
1089-
((LastQueryScanNode) child)
1090-
.getIdxOfMeasurementSchemas()
1091-
.sort(
1092-
Comparator.comparing(
1093-
idx ->
1094-
new Binary(
1095-
node.getMeasurementSchema(idx).getMeasurementName(),
1096-
TSFileConfig.STRING_CHARSET),
1097-
Comparator.naturalOrder()));
1089+
List<Integer> sorted =
1090+
new ArrayList<>(((LastQueryScanNode) child).getIdxOfMeasurementSchemas());
1091+
sorted.sort(
1092+
Comparator.comparing(
1093+
idx ->
1094+
new Binary(
1095+
node.getMeasurementSchema(idx).getMeasurementName(),
1096+
TSFileConfig.STRING_CHARSET),
1097+
Comparator.naturalOrder()));
1098+
node.setIndexOfMeasurementSchemas(sorted);
10981099
}
10991100
});
11001101
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode {
5959

6060
private final PartialPath devicePath;
6161
private final boolean aligned;
62-
private final List<Integer> indexOfMeasurementSchemas;
62+
private List<Integer> indexOfMeasurementSchemas;
6363
// This structure does not need to be serialized or deserialized.
6464
// It will be set when the current Node is added to the child by the upper LastQueryNode.
6565
private List<IMeasurementSchema> globalMeasurementSchemaList;
@@ -367,6 +367,10 @@ public List<Integer> getIdxOfMeasurementSchemas() {
367367
return indexOfMeasurementSchemas;
368368
}
369369

370+
public void setIndexOfMeasurementSchemas(List<Integer> indexOfMeasurementSchemas) {
371+
this.indexOfMeasurementSchemas = indexOfMeasurementSchemas;
372+
}
373+
370374
public List<IMeasurementSchema> getMeasurementSchemas() {
371375
return indexOfMeasurementSchemas.stream()
372376
.map(globalMeasurementSchemaList::get)

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,22 @@
2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.commons.exception.IllegalPathException;
2424
import org.apache.iotdb.commons.path.MeasurementPath;
25+
import org.apache.iotdb.commons.path.PartialPath;
2526
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
2627
import org.apache.iotdb.db.queryengine.common.QueryId;
28+
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
2729
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
2830
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
2931
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
32+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
3033
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
3134
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
3235
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode;
3336
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode;
37+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode;
3438

39+
import org.apache.tsfile.enums.TSDataType;
40+
import org.apache.tsfile.write.schema.MeasurementSchema;
3541
import org.junit.Assert;
3642
import org.junit.Test;
3743

@@ -41,6 +47,57 @@
4147

4248
public class LastQueryTest {
4349

50+
@Test
51+
public void testSortLastQueryScanNode() throws IllegalPathException {
52+
LastQueryNode lastQueryNode = new LastQueryNode(new PlanNodeId("test"), null, true);
53+
54+
lastQueryNode.addDeviceLastQueryScanNode(
55+
new PlanNodeId("test_last_query_scan1"),
56+
new PartialPath("root.test.d1"),
57+
true,
58+
Arrays.asList(
59+
new MeasurementSchema("s3", TSDataType.INT32),
60+
new MeasurementSchema("s1", TSDataType.BOOLEAN),
61+
new MeasurementSchema("s2", TSDataType.INT32)),
62+
null,
63+
null);
64+
lastQueryNode.addDeviceLastQueryScanNode(
65+
new PlanNodeId("test_last_query_scan2"),
66+
new PartialPath("root.test.d0"),
67+
false,
68+
Collections.singletonList(new MeasurementSchema("s0", TSDataType.BOOLEAN)),
69+
null,
70+
null);
71+
72+
Analysis analysis = Util.constructAnalysis();
73+
SourceRewriter sourceRewriter = new SourceRewriter(analysis);
74+
DistributionPlanContext context =
75+
new DistributionPlanContext(
76+
new MPPQueryContext("", new QueryId("test"), null, new TEndPoint(), new TEndPoint()));
77+
context.setOneSeriesInMultiRegion(true);
78+
context.setQueryMultiRegion(true);
79+
List<PlanNode> result = sourceRewriter.visitLastQuery(lastQueryNode, context);
80+
Assert.assertEquals(1, result.size());
81+
Assert.assertTrue(result.get(0) instanceof LastQueryMergeNode);
82+
LastQueryMergeNode mergeNode = (LastQueryMergeNode) result.get(0);
83+
Assert.assertEquals(1, mergeNode.getChildren().size());
84+
Assert.assertTrue(mergeNode.getChildren().get(0) instanceof LastQueryNode);
85+
86+
LastQueryNode lastQueryNode2 = (LastQueryNode) mergeNode.getChildren().get(0);
87+
Assert.assertEquals(2, lastQueryNode2.getChildren().size());
88+
Assert.assertTrue(lastQueryNode2.getChildren().get(0) instanceof LastQueryScanNode);
89+
90+
LastQueryScanNode scanNodeChild1 = (LastQueryScanNode) lastQueryNode2.getChildren().get(0);
91+
Assert.assertTrue(scanNodeChild1.getDevicePath().toString().contains("d0"));
92+
Assert.assertEquals("s0", scanNodeChild1.getMeasurementSchemas().get(0).getMeasurementName());
93+
94+
LastQueryScanNode scanNodeChild2 = (LastQueryScanNode) lastQueryNode2.getChildren().get(1);
95+
Assert.assertTrue(scanNodeChild2.getDevicePath().toString().contains("d1"));
96+
Assert.assertEquals("s1", scanNodeChild2.getMeasurementSchemas().get(0).getMeasurementName());
97+
Assert.assertEquals("s2", scanNodeChild2.getMeasurementSchemas().get(1).getMeasurementName());
98+
Assert.assertEquals("s3", scanNodeChild2.getMeasurementSchemas().get(2).getMeasurementName());
99+
}
100+
44101
@Test
45102
public void testLastQuery1Series1Region() throws IllegalPathException {
46103
String d2s1Path = "root.sg.d22.s1";

0 commit comments

Comments
 (0)