Skip to content

Commit c5eacf0

Browse files
authored
Fix error in single device with sort + offset + limit align by device query
1 parent 05bc4fd commit c5eacf0

File tree

5 files changed

+142
-4
lines changed

5 files changed

+142
-4
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,30 @@ public static void tearDown() throws Exception {
5353
EnvFactory.getEnv().cleanClusterEnvironment();
5454
}
5555

56+
String[] expectedHeader;
57+
String[] retArray;
58+
59+
@Test
60+
public void singleDeviceTest() {
61+
expectedHeader = new String[] {"Time,Device,precipitation"};
62+
retArray = new String[] {"1668960000200,root.weather.London,1667492178318,"};
63+
resultSetEqualTest(
64+
"select precipitation from root.weather.London where precipitation>1667492178118 order by time offset 1 limit 1 align by device",
65+
expectedHeader,
66+
retArray);
67+
68+
retArray = new String[] {"1668960000200,root.weather.London,1667492178318,"};
69+
resultSetEqualTest(
70+
"select precipitation from root.weather.London where precipitation>1667492178118 order by precipitation offset 1 limit 1 align by device",
71+
expectedHeader,
72+
retArray);
73+
}
74+
5675
@Test
5776
public void orderByCanNotPushLimitTest() {
5877
// 1. value filter, can not push down LIMIT
59-
String[] expectedHeader = new String[] {"Time,Device,s1"};
60-
String[] retArray = new String[] {"3,root.db.d1,111,"};
78+
expectedHeader = new String[] {"Time,Device,s1"};
79+
retArray = new String[] {"3,root.db.d1,111,"};
6180
resultSetEqualTest(
6281
"SELECT * FROM root.db.** WHERE s1>40 ORDER BY TIME LIMIT 1 ALIGN BY DEVICE;",
6382
expectedHeader,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ public LogicalPlanBuilder planDeviceView(
520520
? queryStatement.getRowOffset() + queryStatement.getRowLimit()
521521
: queryStatement.getRowLimit();
522522

523-
if (canUseTopKNode(queryStatement, limitValue)) {
523+
if (canUseTopKNode(queryStatement, limitValue) && deviceNameToSourceNodesMap.size() > 1) {
524524
TopKNode topKNode =
525525
new TopKNode(
526526
context.getQueryId().genPlanNodeId(),
@@ -552,7 +552,8 @@ public LogicalPlanBuilder planDeviceView(
552552

553553
analysis.setUseTopKNode();
554554
this.root = topKNode;
555-
} else if (canUseMergeSortNode(queryStatement, deviceNameToSourceNodesMap.size())) {
555+
} else if (canUseMergeSortNode(queryStatement, deviceNameToSourceNodesMap.size())
556+
&& deviceNameToSourceNodesMap.size() > 1) {
556557
// use MergeSortNode + SingleDeviceViewNode
557558
MergeSortNode mergeSortNode =
558559
new MergeSortNode(

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@
3030
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
3131
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
3232
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
33+
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
3334
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
3435
import org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
36+
import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
37+
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
38+
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
3539
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
3640

3741
import org.junit.Assert;
@@ -51,6 +55,7 @@
5155
import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries;
5256
import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.schemaMap;
5357

58+
/** Use optimize rule: LimitOffsetPushDown and OrderByExpressionWithLimitChangeToTopK */
5459
public class LimitOffsetPushDownTest {
5560

5661
@Test
@@ -139,6 +144,7 @@ public void testPushDownWithFill() {
139144

140145
@Test
141146
public void testPushDownAlignByDevice() {
147+
// non aligned device
142148
checkPushDown(
143149
"select s1 from root.sg.d1 limit 100 offset 100 align by device;",
144150
new TestPlanBuilder()
@@ -151,6 +157,71 @@ public void testPushDownAlignByDevice() {
151157
.scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
152158
.singleDeviceView("1", "root.sg.d1", "s1")
153159
.getRoot());
160+
161+
OrderByParameter orderByParameter =
162+
new OrderByParameter(
163+
Arrays.asList(
164+
new SortItem(OrderByKey.TIME, Ordering.ASC),
165+
new SortItem(OrderByKey.DEVICE, Ordering.ASC)));
166+
checkPushDown(
167+
"select s1 from root.sg.d1 order by time asc limit 100 offset 100 align by device;",
168+
new TestPlanBuilder()
169+
.scan("0", schemaMap.get("root.sg.d1.s1"), 200)
170+
.singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1")
171+
.offset("2", 100)
172+
.limit("3", 100)
173+
.getRoot(),
174+
new TestPlanBuilder()
175+
.scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
176+
.singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1")
177+
.getRoot());
178+
179+
// can not push down
180+
orderByParameter =
181+
new OrderByParameter(
182+
Arrays.asList(
183+
new SortItem("s1", Ordering.ASC),
184+
new SortItem("DEVICE", Ordering.ASC),
185+
new SortItem("TIME", Ordering.ASC)));
186+
checkPushDown(
187+
"select s1 from root.sg.d1 order by s1 asc limit 100 offset 100 align by device;",
188+
new TestPlanBuilder()
189+
.scan("0", schemaMap.get("root.sg.d1.s1"))
190+
.singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1")
191+
.sort("2", orderByParameter)
192+
.offset("3", 100)
193+
.limit("4", 100)
194+
.getRoot(),
195+
new TestPlanBuilder()
196+
.scan("0", schemaMap.get("root.sg.d1.s1"))
197+
.singleOrderedDeviceView("1", "root.sg.d1", orderByParameter, "s1")
198+
.topK("5", 200, orderByParameter, Arrays.asList("Device", "s1"))
199+
.offset("3", 100)
200+
.limit("4", 100)
201+
.getRoot());
202+
203+
orderByParameter =
204+
new OrderByParameter(
205+
Arrays.asList(
206+
new SortItem("s1", Ordering.ASC),
207+
new SortItem("DEVICE", Ordering.ASC),
208+
new SortItem("TIME", Ordering.ASC)));
209+
checkPushDown(
210+
"select s1,s2 from root.sg.d2.a order by s1 asc limit 100 offset 100 align by device;",
211+
new TestPlanBuilder()
212+
.scanAligned("0", schemaMap.get("root.sg.d2.a"))
213+
.singleOrderedDeviceView("1", "root.sg.d2.a", orderByParameter, "s1", "s2")
214+
.sort("2", orderByParameter)
215+
.offset("3", 100)
216+
.limit("4", 100)
217+
.getRoot(),
218+
new TestPlanBuilder()
219+
.scanAligned("0", schemaMap.get("root.sg.d2.a"))
220+
.singleOrderedDeviceView("1", "root.sg.d2.a", orderByParameter, "s1", "s2")
221+
.topK("5", 200, orderByParameter, Arrays.asList("Device", "s1"))
222+
.offset("3", 100)
223+
.limit("4", 100)
224+
.getRoot());
154225
}
155226

156227
@Test

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ public static void checkPushDown(
129129
Assert.assertEquals(rawPlan, actualPlan);
130130

131131
PlanNode actualOptPlan = optimizer.optimize(actualPlan, analysis, context);
132+
actualOptPlan =
133+
new OrderByExpressionWithLimitChangeToTopK().optimize(actualOptPlan, analysis, context);
132134
Assert.assertEquals(optPlan, actualOptPlan);
133135
}
134136

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
3636
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
3737
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
38+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
39+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
3840
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
3941
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
4042
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
@@ -82,6 +84,13 @@ public TestPlanBuilder scan(String id, PartialPath path) {
8284
return this;
8385
}
8486

87+
public TestPlanBuilder scan(String id, PartialPath path, long pushDownLimit) {
88+
SeriesScanNode node = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path);
89+
node.setPushDownLimit(pushDownLimit);
90+
this.root = node;
91+
return this;
92+
}
93+
8594
public TestPlanBuilder scanAligned(String id, PartialPath path) {
8695
this.root = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path);
8796
return this;
@@ -360,6 +369,42 @@ public TestPlanBuilder singleDeviceView(String id, String device, String measure
360369
return this;
361370
}
362371

372+
public TestPlanBuilder sort(String id, OrderByParameter orderParameter) {
373+
this.root = new SortNode(new PlanNodeId(id), getRoot(), orderParameter);
374+
return this;
375+
}
376+
377+
public TestPlanBuilder topK(
378+
String id, int topKValue, OrderByParameter mergeOrderParameter, List<String> outputColumns) {
379+
this.root =
380+
new TopKNode(
381+
new PlanNodeId(id),
382+
topKValue,
383+
Collections.singletonList(getRoot()),
384+
mergeOrderParameter,
385+
outputColumns);
386+
return this;
387+
}
388+
389+
public TestPlanBuilder singleOrderedDeviceView(
390+
String id, String device, OrderByParameter orderByParameter, String... measurement) {
391+
IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(device);
392+
Map<IDeviceID, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
393+
deviceToMeasurementIndexesMap.put(
394+
deviceID, measurement.length == 1 ? Collections.singletonList(1) : Arrays.asList(1, 2));
395+
DeviceViewNode deviceViewNode =
396+
new DeviceViewNode(
397+
new PlanNodeId(id),
398+
orderByParameter,
399+
measurement.length == 1
400+
? Arrays.asList(DEVICE, measurement[0])
401+
: Arrays.asList(DEVICE, measurement[0], measurement[1]),
402+
deviceToMeasurementIndexesMap);
403+
deviceViewNode.addChildDeviceNode(deviceID, getRoot());
404+
this.root = deviceViewNode;
405+
return this;
406+
}
407+
363408
public TestPlanBuilder filter(
364409
String id, List<Expression> expressions, Expression predicate, boolean isGroupByTime) {
365410
this.root =

0 commit comments

Comments
 (0)