Skip to content

Commit d86b86e

Browse files
authored
Merge LastQueryScanNode of same device
1 parent 5cda97b commit d86b86e

34 files changed

+931
-661
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ public class MPPQueryContext {
8282
// constructing some Expression and PlanNode.
8383
private final MemoryReservationManager memoryReservationManager;
8484

85+
private static final int minSizeToUseSampledTimeseriesOperandMemCost = 100;
86+
private double avgTimeseriesOperandMemCost = 0;
87+
private int numsOfSampledTimeseriesOperand = 0;
88+
// When there is no view in a last query and no device exists in multiple regions,
89+
// the updateScanNum process in distributed planning can be skipped.
90+
private boolean needUpdateScanNumForLastQuery = false;
91+
8592
private boolean userQuery = false;
8693

8794
public MPPQueryContext(QueryId queryId) {
@@ -344,8 +351,30 @@ public void releaseMemoryReservedForFrontEnd(final long bytes) {
344351
this.memoryReservationManager.releaseMemoryCumulatively(bytes);
345352
}
346353

354+
public boolean useSampledAvgTimeseriesOperandMemCost() {
355+
return numsOfSampledTimeseriesOperand >= minSizeToUseSampledTimeseriesOperandMemCost;
356+
}
357+
358+
public long getAvgTimeseriesOperandMemCost() {
359+
return (long) avgTimeseriesOperandMemCost;
360+
}
361+
362+
public void calculateAvgTimeseriesOperandMemCost(long current) {
363+
numsOfSampledTimeseriesOperand++;
364+
avgTimeseriesOperandMemCost +=
365+
(current - avgTimeseriesOperandMemCost) / numsOfSampledTimeseriesOperand;
366+
}
367+
347368
// endregion
348369

370+
public boolean needUpdateScanNumForLastQuery() {
371+
return needUpdateScanNumForLastQuery;
372+
}
373+
374+
public void setNeedUpdateScanNumForLastQuery(boolean needUpdateScanNumForLastQuery) {
375+
this.needUpdateScanNumForLastQuery = needUpdateScanNumForLastQuery;
376+
}
377+
349378
public Optional<String> getDatabaseName() {
350379
return session.getDatabaseName();
351380
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828

2929
import javax.annotation.Nullable;
3030

31+
import java.util.ArrayList;
3132
import java.util.Arrays;
33+
import java.util.List;
3234

3335
public class MemoryEstimationHelper {
3436

@@ -41,6 +43,11 @@ public class MemoryEstimationHelper {
4143
private static final long MEASUREMENT_PATH_INSTANCE_SIZE =
4244
RamUsageEstimator.shallowSizeOfInstance(AlignedPath.class);
4345

46+
private static final long ARRAY_LIST_INSTANCE_SIZE =
47+
RamUsageEstimator.shallowSizeOfInstance(ArrayList.class);
48+
private static final long INTEGER_INSTANCE_SIZE =
49+
RamUsageEstimator.shallowSizeOfInstance(Integer.class);
50+
4451
private MemoryEstimationHelper() {
4552
// hide the constructor
4653
}
@@ -91,4 +98,25 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par
9198
}
9299
return totalSize;
93100
}
101+
102+
// This method should only be called if the content in the current PartialPath comes from other
103+
// structures whose memory cost have already been calculated.
104+
public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) {
105+
if (partialPath == null) {
106+
return 0;
107+
}
108+
return PARTIAL_PATH_INSTANCE_SIZE + RamUsageEstimator.shallowSizeOf(partialPath.getNodes());
109+
}
110+
111+
public static long getEstimatedSizeOfIntegerArrayList(List<Integer> integerArrayList) {
112+
if (integerArrayList == null) {
113+
return 0L;
114+
}
115+
long size = ARRAY_LIST_INSTANCE_SIZE;
116+
size +=
117+
(long) RamUsageEstimator.NUM_BYTES_ARRAY_HEADER
118+
+ (long) integerArrayList.size() * (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF;
119+
size += INTEGER_INSTANCE_SIZE * integerArrayList.size();
120+
return RamUsageEstimator.alignObjectSize(size);
121+
}
94122
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626

2727
import javax.annotation.concurrent.GuardedBy;
2828

29-
import java.util.HashMap;
3029
import java.util.Map;
30+
import java.util.concurrent.ConcurrentHashMap;
3131
import java.util.concurrent.atomic.AtomicInteger;
3232
import java.util.concurrent.locks.ReentrantLock;
3333

@@ -43,7 +43,7 @@ public class DataNodeQueryContext {
4343
private final ReentrantLock lock = new ReentrantLock();
4444

4545
public DataNodeQueryContext(int dataNodeFINum) {
46-
this.uncachedPathToSeriesScanInfo = new HashMap<>();
46+
this.uncachedPathToSeriesScanInfo = new ConcurrentHashMap<>();
4747
this.dataNodeFINum = new AtomicInteger(dataNodeFINum);
4848
}
4949

@@ -59,15 +59,24 @@ public Pair<AtomicInteger, TimeValuePair> getSeriesScanInfo(PartialPath path) {
5959
return uncachedPathToSeriesScanInfo.get(path);
6060
}
6161

62+
public Map<PartialPath, Pair<AtomicInteger, TimeValuePair>> getUncachedPathToSeriesScanInfo() {
63+
return uncachedPathToSeriesScanInfo;
64+
}
65+
6266
public int decreaseDataNodeFINum() {
6367
return dataNodeFINum.decrementAndGet();
6468
}
6569

66-
public void lock() {
67-
lock.lock();
70+
public void lock(boolean isDeviceInMultiRegion) {
71+
// When a device exists in only one region, there will be no intermediate state.
72+
if (isDeviceInMultiRegion) {
73+
lock.lock();
74+
}
6875
}
6976

70-
public void unLock() {
71-
lock.unlock();
77+
public void unLock(boolean isDeviceInMultiRegion) {
78+
if (isDeviceInMultiRegion) {
79+
lock.unlock();
80+
}
7281
}
7382
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,15 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator
6464

6565
protected String databaseName;
6666

67+
protected boolean deviceInMultiRegion;
68+
6769
protected AbstractUpdateLastCacheOperator(
6870
final OperatorContext operatorContext,
6971
final Operator child,
7072
final TreeDeviceSchemaCacheManager treeDeviceSchemaCacheManager,
7173
final boolean needUpdateCache,
72-
final boolean needUpdateNullEntry) {
74+
final boolean needUpdateNullEntry,
75+
final boolean deviceInMultiRegion) {
7376
this.operatorContext = operatorContext;
7477
this.child = child;
7578
this.lastCache = treeDeviceSchemaCacheManager;
@@ -78,6 +81,7 @@ protected AbstractUpdateLastCacheOperator(
7881
this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1);
7982
this.dataNodeQueryContext =
8083
operatorContext.getDriverContext().getFragmentInstanceContext().getDataNodeQueryContext();
84+
this.deviceInMultiRegion = deviceInMultiRegion;
8185
}
8286

8387
@Override
@@ -106,7 +110,7 @@ protected void mayUpdateLastCache(
106110
return;
107111
}
108112
try {
109-
dataNodeQueryContext.lock();
113+
dataNodeQueryContext.lock(deviceInMultiRegion);
110114
final Pair<AtomicInteger, TimeValuePair> seriesScanInfo =
111115
dataNodeQueryContext.getSeriesScanInfo(fullPath);
112116

@@ -115,6 +119,20 @@ protected void mayUpdateLastCache(
115119
return;
116120
}
117121

122+
if (!deviceInMultiRegion) {
123+
lastCache.updateLastCacheIfExists(
124+
getDatabaseName(),
125+
fullPath.getIDeviceID(),
126+
new String[] {fullPath.getMeasurement()},
127+
new TimeValuePair[] {
128+
Objects.nonNull(value)
129+
? new TimeValuePair(time, value)
130+
: needUpdateNullEntry ? TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR : null
131+
},
132+
fullPath.isUnderAlignedEntity(),
133+
new IMeasurementSchema[] {fullPath.getMeasurementSchema()});
134+
return;
135+
}
118136
// update cache in DataNodeQueryContext
119137
if (seriesScanInfo.right == null || time > seriesScanInfo.right.getTimestamp()) {
120138
if (Objects.nonNull(value)) {
@@ -135,7 +153,7 @@ protected void mayUpdateLastCache(
135153
new IMeasurementSchema[] {fullPath.getMeasurementSchema()});
136154
}
137155
} finally {
138-
dataNodeQueryContext.unLock();
156+
dataNodeQueryContext.unLock(deviceInMultiRegion);
139157
}
140158
}
141159

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,15 @@ public AlignedUpdateLastCacheOperator(
4747
AlignedPath seriesPath,
4848
TreeDeviceSchemaCacheManager treeDeviceSchemaCacheManager,
4949
boolean needUpdateCache,
50-
boolean needUpdateNullEntry) {
50+
boolean needUpdateNullEntry,
51+
boolean deviceInMultiRegion) {
5152
super(
52-
operatorContext, child, treeDeviceSchemaCacheManager, needUpdateCache, needUpdateNullEntry);
53+
operatorContext,
54+
child,
55+
treeDeviceSchemaCacheManager,
56+
needUpdateCache,
57+
needUpdateNullEntry,
58+
deviceInMultiRegion);
5359
this.seriesPath = seriesPath;
5460
this.devicePath = seriesPath.getDevicePath();
5561
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@ public AlignedUpdateViewPathLastCacheOperator(
4141
TreeDeviceSchemaCacheManager treeDeviceSchemaCacheManager,
4242
boolean needUpdateCache,
4343
boolean needUpdateNullEntry,
44-
String outputViewPath) {
44+
String outputViewPath,
45+
boolean deviceInMultiRegion) {
4546
super(
4647
operatorContext,
4748
child,
4849
seriesPath,
4950
treeDeviceSchemaCacheManager,
5051
needUpdateCache,
51-
needUpdateNullEntry);
52+
needUpdateNullEntry,
53+
deviceInMultiRegion);
5254
checkArgument(seriesPath.getMeasurementList().size() == 1);
5355
this.outputViewPath = outputViewPath;
5456
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public TsBlock next() throws Exception {
116116
return null;
117117
} else if (!tsBlock.isEmpty()) {
118118
LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock);
119+
return null;
119120
}
120121
} else {
121122
children.get(currentIndex).close();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ private TsBlock buildResult() throws Exception {
153153

154154
while (keepGoing(start, maxRuntime, endIndex)) {
155155

156-
if (prepareData()) {
156+
prepareData();
157+
if (previousTsBlock == null) {
157158
return null;
158159
}
159160

@@ -179,21 +180,18 @@ private boolean keepGoing(long start, long maxRuntime, int endIndex) {
179180
&& !tsBlockBuilder.isFull();
180181
}
181182

182-
private boolean prepareData() throws Exception {
183+
private void prepareData() throws Exception {
183184
if (previousTsBlock == null || previousTsBlock.getPositionCount() <= previousTsBlockIndex) {
184185
if (children.get(currentIndex).hasNextWithTimer()) {
185186
previousTsBlock = children.get(currentIndex).nextWithTimer();
186187
previousTsBlockIndex = 0;
187-
if (previousTsBlock == null) {
188-
return true;
189-
}
188+
return;
190189
} else {
191190
children.get(currentIndex).close();
192191
children.set(currentIndex, null);
193192
}
194193
currentIndex++;
195194
}
196-
return false;
197195
}
198196

199197
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,33 @@ public UpdateLastCacheOperator(
5353
TreeDeviceSchemaCacheManager treeDeviceSchemaCacheManager,
5454
boolean needUpdateCache,
5555
boolean isNeedUpdateNullEntry) {
56+
this(
57+
operatorContext,
58+
child,
59+
fullPath,
60+
dataType,
61+
treeDeviceSchemaCacheManager,
62+
needUpdateCache,
63+
isNeedUpdateNullEntry,
64+
true);
65+
}
66+
67+
public UpdateLastCacheOperator(
68+
OperatorContext operatorContext,
69+
Operator child,
70+
MeasurementPath fullPath,
71+
TSDataType dataType,
72+
TreeDeviceSchemaCacheManager treeDeviceSchemaCacheManager,
73+
boolean needUpdateCache,
74+
boolean isNeedUpdateNullEntry,
75+
boolean deviceInMultiRegion) {
5676
super(
5777
operatorContext,
5878
child,
5979
treeDeviceSchemaCacheManager,
6080
needUpdateCache,
61-
isNeedUpdateNullEntry);
81+
isNeedUpdateNullEntry,
82+
deviceInMultiRegion);
6283
this.fullPath = fullPath;
6384
this.dataType = dataType.name();
6485
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ public class Analysis implements IAnalysis {
114114
// map from device name to series/aggregation under this device
115115
private Set<Expression> sourceExpressions;
116116

117+
// In order to perform some optimization, when the source expression is
118+
// not used later, nothing will be placed in this structure.
119+
private boolean shouldHaveSourceExpression;
120+
117121
// input expressions of aggregations to be calculated
118122
private Set<Expression> sourceTransformExpressions = new HashSet<>();
119123

@@ -234,7 +238,9 @@ aggregation results last_value(temperature) and last_value(status), whereas buck
234238
// Key: non-writable view expression, Value: corresponding source expressions
235239
private Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap;
236240

237-
private Set<Expression> lastQueryBaseExpressions;
241+
private Map<IDeviceID, Map<String, Expression>> lastQueryOutputPathToSourceExpressionMap;
242+
243+
private Set<IDeviceID> deviceExistViewSet;
238244

239245
// header of result dataset
240246
private DatasetHeader respDatasetHeader;
@@ -619,6 +625,14 @@ public void setSourceExpressions(Set<Expression> sourceExpressions) {
619625
this.sourceExpressions = sourceExpressions;
620626
}
621627

628+
public void setShouldHaveSourceExpression(boolean shouldHaveSourceExpression) {
629+
this.shouldHaveSourceExpression = shouldHaveSourceExpression;
630+
}
631+
632+
public boolean shouldHaveSourceExpression() {
633+
return shouldHaveSourceExpression;
634+
}
635+
622636
public Set<Expression> getSourceTransformExpressions() {
623637
return sourceTransformExpressions;
624638
}
@@ -886,12 +900,21 @@ public void setTimeseriesOrderingForLastQuery(Ordering timeseriesOrderingForLast
886900
this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery;
887901
}
888902

889-
public Set<Expression> getLastQueryBaseExpressions() {
890-
return this.lastQueryBaseExpressions;
903+
public Map<IDeviceID, Map<String, Expression>> getLastQueryOutputPathToSourceExpressionMap() {
904+
return lastQueryOutputPathToSourceExpressionMap;
905+
}
906+
907+
public void setLastQueryOutputPathToSourceExpressionMap(
908+
Map<IDeviceID, Map<String, Expression>> lastQueryOutputPathToSourceExpressionMap) {
909+
this.lastQueryOutputPathToSourceExpressionMap = lastQueryOutputPathToSourceExpressionMap;
910+
}
911+
912+
public Set<IDeviceID> getDeviceExistViewSet() {
913+
return deviceExistViewSet;
891914
}
892915

893-
public void setLastQueryBaseExpressions(Set<Expression> lastQueryBaseExpressions) {
894-
this.lastQueryBaseExpressions = lastQueryBaseExpressions;
916+
public void setDeviceExistViewSet(Set<IDeviceID> deviceExistViewSet) {
917+
this.deviceExistViewSet = deviceExistViewSet;
895918
}
896919

897920
public Map<Expression, List<Expression>> getLastQueryNonWritableViewSourceExpressionMap() {

0 commit comments

Comments
 (0)