Skip to content

Commit 772bab4

Browse files
authored
Fix display of Blob type in last query
1 parent 08b4f8f commit 772bab4

File tree

9 files changed

+63
-19
lines changed

9 files changed

+63
-19
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public void testLastQuerySortWithLimit() {
183183
}
184184

185185
@Test
186-
public void testLastQuerySortWithBlobType() {
186+
public void testLastQueryWithBlobType() {
187187
String[] expectedHeader =
188188
new String[] {TIMESTAMP_STR, TIMESERIES_STR, VALUE_STR, DATA_TYPE_STR};
189189
String[] retArray =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanContext.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public class LocalExecutionPlanContext {
8383
private List<TSDataType> cachedDataTypes;
8484

8585
// left is cached last value in last query
86-
// right is full path for each cached last value
87-
private List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList;
86+
// right is full path and DataType for each cached last value
87+
private List<Pair<TimeValuePair, Pair<Binary, TSDataType>>> cachedLastValueAndPathList;
8888

8989
// whether we need to update last cache
9090
private boolean needUpdateLastCache;
@@ -274,15 +274,18 @@ public void setNeedUpdateLastCache(boolean needUpdateLastCache) {
274274
this.needUpdateLastCache = needUpdateLastCache;
275275
}
276276

277-
public void addCachedLastValue(TimeValuePair timeValuePair, String fullPath) {
277+
public void addCachedLastValue(
278+
TimeValuePair timeValuePair, String fullPath, TSDataType dataType) {
278279
if (cachedLastValueAndPathList == null) {
279280
cachedLastValueAndPathList = new ArrayList<>();
280281
}
281282
cachedLastValueAndPathList.add(
282-
new Pair<>(timeValuePair, new Binary(fullPath, TSFileConfig.STRING_CHARSET)));
283+
new Pair<>(
284+
timeValuePair,
285+
new Pair<>(new Binary(fullPath, TSFileConfig.STRING_CHARSET), dataType)));
283286
}
284287

285-
public List<Pair<TimeValuePair, Binary>> getCachedLastValueAndPathList() {
288+
public List<Pair<TimeValuePair, Pair<Binary, TSDataType>>> getCachedLastValueAndPathList() {
286289
return cachedLastValueAndPathList;
287290
}
288291

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ public LogicalPlanBuilder planLast(Analysis analysis, Ordering timeseriesOrderin
257257
sourceExpression.isViewExpression()
258258
? sourceExpression.getViewPath().getFullPath()
259259
: null;
260+
TSDataType outputViewPathType =
261+
outputViewPath == null ? null : selectedPath.getSeriesType();
260262

261263
PartialPath devicePath = selectedPath.getDevicePath();
262264
// For expression with view path, we do not use the deviceId in Map.Entry because it is a
@@ -268,7 +270,8 @@ public LogicalPlanBuilder planLast(Analysis analysis, Ordering timeseriesOrderin
268270
devicePath,
269271
selectedPath.isUnderAlignedEntity(),
270272
Collections.singletonList(selectedPath.getMeasurementSchema()),
271-
outputViewPath);
273+
outputViewPath,
274+
outputViewPathType);
272275
this.context.reserveMemoryForFrontEnd(memCost);
273276
}
274277
} else {
@@ -291,6 +294,7 @@ public LogicalPlanBuilder planLast(Analysis analysis, Ordering timeseriesOrderin
291294
devicePath,
292295
aligned,
293296
measurementSchemas,
297+
null,
294298
null);
295299
this.context.reserveMemoryForFrontEnd(memCost);
296300
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3047,9 +3047,11 @@ public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanCon
30473047
}
30483048
} else { // cached last value is satisfied, put it into LastCacheScanOperator
30493049
if (node.getOutputViewPath() != null) {
3050-
context.addCachedLastValue(timeValuePair, node.getOutputViewPath());
3050+
context.addCachedLastValue(
3051+
timeValuePair, node.getOutputViewPath(), node.getOutputViewPathType());
30513052
} else {
3052-
context.addCachedLastValue(timeValuePair, measurementPath.getFullPath());
3053+
context.addCachedLastValue(
3054+
timeValuePair, measurementPath.getFullPath(), measurementSchema.getType());
30533055
}
30543056
}
30553057
}
@@ -3097,7 +3099,7 @@ public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext con
30973099
.filter(Objects::nonNull)
30983100
.collect(Collectors.toList());
30993101

3100-
List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList =
3102+
List<Pair<TimeValuePair, Pair<Binary, TSDataType>>> cachedLastValueAndPathList =
31013103
context.getCachedLastValueAndPathList();
31023104

31033105
int initSize = cachedLastValueAndPathList != null ? cachedLastValueAndPathList.size() : 0;
@@ -3109,9 +3111,9 @@ public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext con
31093111
LastQueryUtil.appendLastValueRespectBlob(
31103112
builder,
31113113
timeValuePair.getTimestamp(),
3112-
cachedLastValueAndPathList.get(i).right,
3114+
cachedLastValueAndPathList.get(i).right.getLeft(),
31133115
timeValuePair.getValue(),
3114-
timeValuePair.getValue().getDataType().name());
3116+
cachedLastValueAndPathList.get(i).right.getRight().name());
31153117
}
31163118
OperatorContext operatorContext =
31173119
context
@@ -3127,7 +3129,8 @@ public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext con
31273129
node.getTimeseriesOrdering() == ASC ? ASC_BINARY_COMPARATOR : DESC_BINARY_COMPARATOR;
31283130
// sort values from last cache
31293131
if (initSize > 0) {
3130-
cachedLastValueAndPathList.sort(Comparator.comparing(Pair::getRight, comparator));
3132+
cachedLastValueAndPathList.sort(
3133+
Comparator.comparing(pair -> pair.getRight().getLeft(), comparator));
31313134
}
31323135

31333136
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(initSize);
@@ -3136,9 +3139,9 @@ public Operator visitLastQuery(LastQueryNode node, LocalExecutionPlanContext con
31363139
LastQueryUtil.appendLastValueRespectBlob(
31373140
builder,
31383141
timeValuePair.getTimestamp(),
3139-
cachedLastValueAndPathList.get(i).right,
3142+
cachedLastValueAndPathList.get(i).right.getLeft(),
31403143
timeValuePair.getValue(),
3141-
timeValuePair.getValue().getDataType().name());
3144+
cachedLastValueAndPathList.get(i).right.getRight().name());
31423145
}
31433146

31443147
OperatorContext operatorContext =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
3030
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
3131

32+
import org.apache.tsfile.enums.TSDataType;
3233
import org.apache.tsfile.utils.RamUsageEstimator;
3334
import org.apache.tsfile.utils.ReadWriteIOUtils;
3435
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -89,7 +90,8 @@ public long addDeviceLastQueryScanNode(
8990
PartialPath devicePath,
9091
boolean aligned,
9192
List<IMeasurementSchema> measurementSchemas,
92-
String outputViewPath) {
93+
String outputViewPath,
94+
TSDataType outputViewPathType) {
9395
List<Integer> idxList = new ArrayList<>(measurementSchemas.size());
9496
for (IMeasurementSchema measurementSchema : measurementSchemas) {
9597
int idx =
@@ -103,7 +105,13 @@ public long addDeviceLastQueryScanNode(
103105
}
104106
LastQueryScanNode scanNode =
105107
new LastQueryScanNode(
106-
id, devicePath, aligned, idxList, outputViewPath, globalMeasurementSchemaList);
108+
id,
109+
devicePath,
110+
aligned,
111+
idxList,
112+
outputViewPath,
113+
outputViewPathType,
114+
globalMeasurementSchemaList);
107115
children.add(scanNode);
108116
return scanNode.ramBytesUsed();
109117
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
3232

3333
import com.google.common.collect.ImmutableList;
34+
import org.apache.tsfile.enums.TSDataType;
3435
import org.apache.tsfile.utils.RamUsageEstimator;
3536
import org.apache.tsfile.utils.ReadWriteIOUtils;
3637
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -64,6 +65,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode {
6465
private List<IMeasurementSchema> globalMeasurementSchemaList;
6566

6667
private final String outputViewPath;
68+
private final TSDataType outputViewPathType;
6769

6870
// The id of DataRegion where the node will run
6971
private TRegionReplicaSet regionReplicaSet;
@@ -75,12 +77,14 @@ public LastQueryScanNode(
7577
boolean aligned,
7678
List<Integer> indexOfMeasurementSchemas,
7779
String outputViewPath,
80+
TSDataType outputViewPathType,
7881
List<IMeasurementSchema> globalMeasurementSchemaList) {
7982
super(id, new AtomicInteger(1));
8083
this.aligned = aligned;
8184
this.devicePath = devicePath;
8285
this.indexOfMeasurementSchemas = indexOfMeasurementSchemas;
8386
this.outputViewPath = outputViewPath;
87+
this.outputViewPathType = outputViewPathType;
8488
this.globalMeasurementSchemaList = globalMeasurementSchemaList;
8589
}
8690

@@ -90,14 +94,16 @@ public LastQueryScanNode(
9094
boolean aligned,
9195
List<Integer> indexOfMeasurementSchemas,
9296
AtomicInteger dataNodeSeriesScanNum,
93-
String outputViewPath) {
97+
String outputViewPath,
98+
TSDataType outputViewPathType) {
9499
this(
95100
id,
96101
devicePath,
97102
aligned,
98103
indexOfMeasurementSchemas,
99104
dataNodeSeriesScanNum,
100105
outputViewPath,
106+
outputViewPathType,
101107
null);
102108
}
103109

@@ -108,12 +114,14 @@ public LastQueryScanNode(
108114
List<Integer> indexOfMeasurementSchemas,
109115
AtomicInteger dataNodeSeriesScanNum,
110116
String outputViewPath,
117+
TSDataType outputViewPathType,
111118
List<IMeasurementSchema> globalMeasurementSchemaList) {
112119
super(id, dataNodeSeriesScanNum);
113120
this.aligned = aligned;
114121
this.devicePath = devicePath;
115122
this.indexOfMeasurementSchemas = indexOfMeasurementSchemas;
116123
this.outputViewPath = outputViewPath;
124+
this.outputViewPathType = outputViewPathType;
117125
this.globalMeasurementSchemaList = globalMeasurementSchemaList;
118126
}
119127

@@ -124,6 +132,7 @@ public LastQueryScanNode(
124132
List<Integer> indexOfMeasurementSchemas,
125133
AtomicInteger dataNodeSeriesScanNum,
126134
String outputViewPath,
135+
TSDataType outputViewPathType,
127136
TRegionReplicaSet regionReplicaSet,
128137
boolean deviceInMultiRegion,
129138
List<IMeasurementSchema> globalMeasurementSchemaList) {
@@ -132,6 +141,7 @@ public LastQueryScanNode(
132141
this.aligned = aligned;
133142
this.indexOfMeasurementSchemas = indexOfMeasurementSchemas;
134143
this.outputViewPath = outputViewPath;
144+
this.outputViewPathType = outputViewPathType;
135145
this.regionReplicaSet = regionReplicaSet;
136146
this.deviceInMultiRegion = deviceInMultiRegion;
137147
this.globalMeasurementSchemaList = globalMeasurementSchemaList;
@@ -162,6 +172,10 @@ public String getOutputViewPath() {
162172
return outputViewPath;
163173
}
164174

175+
public TSDataType getOutputViewPathType() {
176+
return outputViewPathType;
177+
}
178+
165179
public String getOutputSymbolForSort() {
166180
if (outputViewPath != null) {
167181
return outputViewPath;
@@ -196,6 +210,7 @@ public PlanNode clone() {
196210
indexOfMeasurementSchemas,
197211
getDataNodeSeriesScanNum(),
198212
outputViewPath,
213+
outputViewPathType,
199214
regionReplicaSet,
200215
deviceInMultiRegion,
201216
globalMeasurementSchemaList);
@@ -226,6 +241,7 @@ public boolean equals(Object o) {
226241
&& Objects.equals(aligned, that.aligned)
227242
&& Objects.equals(indexOfMeasurementSchemas, that.indexOfMeasurementSchemas)
228243
&& Objects.equals(outputViewPath, that.outputViewPath)
244+
&& Objects.equals(outputViewPathType, that.outputViewPathType)
229245
&& Objects.equals(regionReplicaSet, that.regionReplicaSet);
230246
}
231247

@@ -275,6 +291,7 @@ protected void serializeAttributes(ByteBuffer byteBuffer) {
275291
ReadWriteIOUtils.write(outputViewPath == null, byteBuffer);
276292
if (outputViewPath != null) {
277293
ReadWriteIOUtils.write(outputViewPath, byteBuffer);
294+
ReadWriteIOUtils.write(outputViewPathType, byteBuffer);
278295
}
279296
ReadWriteIOUtils.write(deviceInMultiRegion, byteBuffer);
280297
}
@@ -292,6 +309,7 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
292309
ReadWriteIOUtils.write(outputViewPath == null, stream);
293310
if (outputViewPath != null) {
294311
ReadWriteIOUtils.write(outputViewPath, stream);
312+
ReadWriteIOUtils.write(outputViewPathType, stream);
295313
}
296314
ReadWriteIOUtils.write(deviceInMultiRegion, stream);
297315
}
@@ -308,6 +326,7 @@ public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
308326
int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer);
309327
boolean isNull = ReadWriteIOUtils.readBool(byteBuffer);
310328
String outputPathSymbol = isNull ? null : ReadWriteIOUtils.readString(byteBuffer);
329+
TSDataType dataType = isNull ? null : ReadWriteIOUtils.readDataType(byteBuffer);
311330
boolean deviceInMultiRegion = ReadWriteIOUtils.readBool(byteBuffer);
312331
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
313332
return new LastQueryScanNode(
@@ -317,6 +336,7 @@ public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
317336
measurementSchemas,
318337
new AtomicInteger(dataNodeSeriesScanNum),
319338
outputPathSymbol,
339+
dataType,
320340
null,
321341
deviceInMultiRegion,
322342
null);

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ private LogicalQueryPlan constructLastQuery(List<String> paths, MPPQueryContext
198198
selectPath.getDevicePath(),
199199
selectPath.isUnderAlignedEntity(),
200200
Collections.singletonList(selectPath.getMeasurementSchema()),
201+
null,
201202
null);
202203
}
203204

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void testLastQuery() {
8989
d1s1Path.getDevicePath(),
9090
d1s1Path.isUnderAlignedEntity(),
9191
measurementSchemas,
92+
null,
9293
null);
9394

9495
measurementSchemas =
@@ -102,11 +103,12 @@ public void testLastQuery() {
102103
d2s1Path.getDevicePath(),
103104
d2s1Path.isUnderAlignedEntity(),
104105
measurementSchemas,
106+
null,
105107
null);
106108

107109
AlignedPath aPath = (AlignedPath) schemaMap.get("root.sg.d2.a");
108110
lastQueryNode.addDeviceLastQueryScanNode(
109-
queryId.genPlanNodeId(), aPath.getDevicePath(), true, aPath.getSchemaList(), null);
111+
queryId.genPlanNodeId(), aPath.getDevicePath(), true, aPath.getSchemaList(), null, null);
110112

111113
PlanNode actualPlan = parseSQLToPlanNode(sql);
112114
Assert.assertEquals(actualPlan, lastQueryNode);
@@ -132,6 +134,7 @@ public void testLastQuerySortWithLimit() {
132134
s3Path.getDevicePath(),
133135
s3Path.isUnderAlignedEntity(),
134136
measurementSchemas,
137+
null,
135138
null);
136139

137140
SortNode sortNode =

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public void test() throws IllegalPathException {
4444
true,
4545
Arrays.asList(0, 1),
4646
null,
47+
null,
4748
Arrays.asList(
4849
new MeasurementSchema("s1", TSDataType.INT32),
4950
new MeasurementSchema("s0", TSDataType.BOOLEAN)));
@@ -59,6 +60,7 @@ public void test() throws IllegalPathException {
5960
false,
6061
Arrays.asList(0, 1),
6162
null,
63+
null,
6264
Arrays.asList(
6365
new MeasurementSchema("s1", TSDataType.INT32),
6466
new MeasurementSchema("s0", TSDataType.BOOLEAN)));

0 commit comments

Comments
 (0)