Skip to content

Commit c30af2c

Browse files
authored
Add size control of result for SimpleNestedLoopCrossJoinOperator
Signed-off-by: Weihao Li <[email protected]>
1 parent b142929 commit c30af2c

File tree

2 files changed

+138
-1
lines changed

2 files changed

+138
-1
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/SimpleNestedLoopCrossJoinOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ public TsBlock next() throws Exception {
120120
cachedProbeBlock = null;
121121
return null;
122122
}
123-
while (probeIndex < cachedProbeBlock.getPositionCount()
123+
while (!resultBuilder.isFull()
124+
&& probeIndex < cachedProbeBlock.getPositionCount()
124125
&& System.nanoTime() - start < maxRuntime) {
125126
for (TsBlock buildBlock : buildBlocks) {
126127
appendValueToResult(probeIndex, buildBlock);

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,21 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
2121

22+
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
24+
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
25+
import org.apache.iotdb.db.queryengine.common.QueryId;
26+
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
27+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
28+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
29+
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
30+
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
31+
import org.apache.iotdb.db.queryengine.execution.operator.process.join.SimpleNestedLoopCrossJoinOperator;
32+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
2233
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
2334
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
2435
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
36+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
2537
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
2638
import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
2739
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
@@ -47,13 +59,21 @@
4759
import com.google.common.collect.ImmutableList;
4860
import com.google.common.collect.ImmutableMap;
4961
import com.google.common.collect.ImmutableSet;
62+
import com.google.common.util.concurrent.ListenableFuture;
63+
import org.apache.tsfile.block.column.ColumnBuilder;
64+
import org.apache.tsfile.enums.TSDataType;
65+
import org.apache.tsfile.read.common.block.TsBlock;
66+
import org.apache.tsfile.read.common.block.TsBlockBuilder;
67+
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
5068
import org.junit.Ignore;
5169
import org.junit.Test;
5270

5371
import java.util.Collections;
5472
import java.util.List;
5573
import java.util.Optional;
5674

75+
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
76+
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
5777
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
5878
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.ALL_DEVICE_ENTRIES;
5979
import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.BEIJING_A1_DEVICE_ENTRY;
@@ -87,6 +107,7 @@
87107
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN;
88108
import static org.junit.Assert.assertEquals;
89109
import static org.junit.Assert.assertTrue;
110+
import static org.junit.Assert.fail;
90111

91112
public class JoinTest {
92113
Analysis analysis;
@@ -811,4 +832,119 @@ public void joinSortEliminationTest() {
811832

812833
assertPlan(planTester.getFragmentPlan(8), tableScan("testdb.table1"));
813834
}
835+
836+
@Test
837+
// case: lines of result are more than Integer.MAX_VALUE
838+
public void crossJoinLargeDataTest() {
839+
// lines = 46341
840+
int lines = ((int) Math.sqrt(Integer.MAX_VALUE)) + 1;
841+
try (SimpleNestedLoopCrossJoinOperator aggregationOperator =
842+
genSimpleNestedLoopCrossJoinOperator(lines)) {
843+
ListenableFuture<?> listenableFuture = aggregationOperator.isBlocked();
844+
listenableFuture.get();
845+
long count = 0;
846+
while (!aggregationOperator.isFinished() && aggregationOperator.hasNext()) {
847+
TsBlock tsBlock = aggregationOperator.next();
848+
if (tsBlock != null) {
849+
count += tsBlock.getPositionCount();
850+
}
851+
listenableFuture = aggregationOperator.isBlocked();
852+
listenableFuture.get();
853+
}
854+
assertEquals((long) lines * lines, count);
855+
} catch (Exception e) {
856+
e.printStackTrace();
857+
fail(e.getMessage());
858+
}
859+
}
860+
861+
private SimpleNestedLoopCrossJoinOperator genSimpleNestedLoopCrossJoinOperator(int lines) {
862+
863+
// Construct operator tree
864+
QueryId queryId = new QueryId("stub_query");
865+
866+
FragmentInstanceId instanceId =
867+
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
868+
FragmentInstanceStateMachine stateMachine =
869+
new FragmentInstanceStateMachine(
870+
instanceId,
871+
IoTDBThreadPoolFactory.newFixedThreadPool(
872+
1, "SimpleNestedLoopCrossJoinOperator-test-instance-notification"));
873+
FragmentInstanceContext fragmentInstanceContext =
874+
createFragmentInstanceContext(instanceId, stateMachine);
875+
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
876+
driverContext.addOperatorContext(
877+
1, new PlanNodeId("1"), TableScanOperator.class.getSimpleName());
878+
driverContext.addOperatorContext(
879+
2, new PlanNodeId("2"), TableScanOperator.class.getSimpleName());
880+
driverContext.addOperatorContext(
881+
3, new PlanNodeId("3"), SimpleNestedLoopCrossJoinOperator.class.getSimpleName());
882+
return new SimpleNestedLoopCrossJoinOperator(
883+
driverContext.getOperatorContexts().get(2),
884+
genChildOperator(driverContext.getOperatorContexts().get(0), lines),
885+
genChildOperator(driverContext.getOperatorContexts().get(1), lines),
886+
new int[0],
887+
new int[1],
888+
Collections.singletonList(TSDataType.TIMESTAMP));
889+
}
890+
891+
private Operator genChildOperator(OperatorContext operatorContext, int lines) {
892+
return new Operator() {
893+
boolean finished = false;
894+
895+
@Override
896+
public OperatorContext getOperatorContext() {
897+
return operatorContext;
898+
}
899+
900+
@Override
901+
public TsBlock next() {
902+
TsBlockBuilder builder =
903+
new TsBlockBuilder(Collections.singletonList(TSDataType.TIMESTAMP));
904+
ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
905+
for (int i = 0; i < lines; i++) {
906+
columnBuilder.writeLong(1);
907+
}
908+
builder.declarePositions(lines);
909+
TsBlock result =
910+
builder.build(
911+
new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, builder.getPositionCount()));
912+
finished = true;
913+
return result;
914+
}
915+
916+
@Override
917+
public boolean hasNext() throws Exception {
918+
return !finished;
919+
}
920+
921+
@Override
922+
public void close() throws Exception {}
923+
924+
@Override
925+
public boolean isFinished() throws Exception {
926+
return finished;
927+
}
928+
929+
@Override
930+
public long calculateMaxPeekMemory() {
931+
return 0;
932+
}
933+
934+
@Override
935+
public long calculateMaxReturnSize() {
936+
return 0;
937+
}
938+
939+
@Override
940+
public long calculateRetainedSizeAfterCallingNext() {
941+
return 0;
942+
}
943+
944+
@Override
945+
public long ramBytesUsed() {
946+
return 0;
947+
}
948+
};
949+
}
814950
}

0 commit comments

Comments
 (0)