Skip to content

Commit ea7e2a6

Browse files
authored
Fix cast of TimeColumn after deserialize
1 parent 2df8710 commit ea7e2a6

File tree

12 files changed

+160
-92
lines changed

12 files changed

+160
-92
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.it.udf;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.ClusterIT;
25+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
26+
27+
import org.junit.AfterClass;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runner.RunWith;
32+
33+
import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
34+
import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualWithDescOrderTest;
35+
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
36+
37+
@RunWith(IoTDBTestRunner.class)
38+
@Category({LocalStandaloneIT.class, ClusterIT.class})
39+
public class IoTDBUDFIntermediateBlockSerdeIT {
40+
private static final String[] SQLs =
41+
new String[] {
42+
"insert into root.sg.d1(time, s1) values (1,1)",
43+
"insert into root.sg.d1(time, s1) values (2,2)",
44+
"insert into root.sg.d1(time, s1) values (3,3)",
45+
"insert into root.sg.d1(time, s1) values (4,4)",
46+
"insert into root.sg.d1(time, s1) values (5,5)",
47+
"insert into root.sg.d1(time, s1) values (6,6)"
48+
};
49+
50+
@BeforeClass
51+
public static void setUp() throws Exception {
52+
EnvFactory.getEnv().getConfig().getCommonConfig().setUdfMemoryBudgetInMB(0.0001f);
53+
EnvFactory.getEnv().initClusterEnvironment();
54+
prepareData(SQLs);
55+
}
56+
57+
@AfterClass
58+
public static void tearDown() throws Exception {
59+
EnvFactory.getEnv().cleanClusterEnvironment();
60+
}
61+
62+
@Test
63+
public void testM4() {
64+
String[] expectedHeader =
65+
new String[] {
66+
TIMESTAMP_STR, "EQUAL_SIZE_BUCKET_M4_SAMPLE(root.sg.d1.s1, \"proportion\"=\"1\")"
67+
};
68+
String[] retArray = new String[] {"1,1.0,", "2,2.0,", "3,3.0,", "4,4.0,", "5,5.0,", "6,6.0,"};
69+
resultSetEqualWithDescOrderTest(
70+
"select EQUAL_SIZE_BUCKET_M4_SAMPLE(s1,'proportion'='1') from root.sg.d1",
71+
expectedHeader,
72+
retArray);
73+
}
74+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2468,27 +2468,31 @@ private void loadUDFProps(TrimProperties properties) {
24682468

24692469
String readerTransformerCollectorMemoryProportion =
24702470
properties.getProperty("udf_reader_transformer_collector_memory_proportion");
2471+
String[] proportions;
24712472
if (readerTransformerCollectorMemoryProportion != null) {
2472-
String[] proportions = readerTransformerCollectorMemoryProportion.split(":");
2473-
int proportionSum = 0;
2474-
for (String proportion : proportions) {
2475-
proportionSum += Integer.parseInt(proportion.trim());
2476-
}
2477-
float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB();
2478-
try {
2479-
conf.setUdfReaderMemoryBudgetInMB(
2480-
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
2481-
conf.setUdfTransformerMemoryBudgetInMB(
2482-
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
2483-
conf.setUdfCollectorMemoryBudgetInMB(
2484-
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
2485-
} catch (Exception e) {
2486-
throw new RuntimeException(
2487-
"Each subsection of configuration item udf_reader_transformer_collector_memory_proportion"
2488-
+ " should be an integer, which is "
2489-
+ readerTransformerCollectorMemoryProportion,
2490-
e);
2491-
}
2473+
proportions = readerTransformerCollectorMemoryProportion.split(":");
2474+
} else {
2475+
// Make the default proportion is 1:1:1
2476+
proportions = new String[] {"1", "1", "1"};
2477+
}
2478+
int proportionSum = 0;
2479+
for (String proportion : proportions) {
2480+
proportionSum += Integer.parseInt(proportion.trim());
2481+
}
2482+
float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB();
2483+
try {
2484+
conf.setUdfReaderMemoryBudgetInMB(
2485+
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
2486+
conf.setUdfTransformerMemoryBudgetInMB(
2487+
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
2488+
conf.setUdfCollectorMemoryBudgetInMB(
2489+
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
2490+
} catch (Exception e) {
2491+
throw new RuntimeException(
2492+
"Each subsection of configuration item udf_reader_transformer_collector_memory_proportion"
2493+
+ " should be an integer, which is "
2494+
+ readerTransformerCollectorMemoryProportion,
2495+
e);
24922496
}
24932497
}
24942498

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.tsfile.block.column.Column;
2323
import org.apache.tsfile.read.common.block.TsBlock;
24-
import org.apache.tsfile.read.common.block.column.TimeColumn;
2524

2625
public class SessionWindow implements IWindow {
2726

@@ -87,19 +86,19 @@ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
8786

8887
@Override
8988
public boolean contains(Column column) {
90-
TimeColumn timeColumn = (TimeColumn) column;
91-
92-
long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime());
93-
long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime());
89+
long columnStartTime = column.getLong(0);
90+
long columnEndTime = column.getLong(column.getPositionCount() - 1);
91+
long minTime = Math.min(columnStartTime, columnEndTime);
92+
long maxTime = Math.max(columnStartTime, columnEndTime);
9493

9594
boolean contains =
96-
Math.abs(column.getLong(0) - lastTsBlockTime) < timeInterval
95+
Math.abs(columnStartTime - lastTsBlockTime) < timeInterval
9796
&& maxTime - minTime <= timeInterval;
9897
if (contains) {
9998
if (!initializedTimeValue) {
10099
startTime = Long.MAX_VALUE;
101100
endTime = Long.MIN_VALUE;
102-
lastTsBlockTime = column.getLong(0);
101+
lastTsBlockTime = columnStartTime;
103102
timeValue = ascending ? maxTime : minTime;
104103
initializedTimeValue = true;
105104
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/TimeWindow.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.tsfile.block.column.Column;
2323
import org.apache.tsfile.read.common.TimeRange;
2424
import org.apache.tsfile.read.common.block.TsBlock;
25-
import org.apache.tsfile.read.common.block.column.TimeColumn;
2625

2726
public class TimeWindow implements IWindow {
2827

@@ -62,10 +61,10 @@ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
6261

6362
@Override
6463
public boolean contains(Column column) {
65-
TimeColumn timeColumn = (TimeColumn) column;
66-
67-
long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime());
68-
long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime());
64+
long startTime = column.getLong(0);
65+
long endTime = column.getLong(column.getPositionCount() - 1);
66+
long minTime = Math.min(startTime, endTime);
67+
long maxTime = Math.max(startTime, endTime);
6968

7069
return curTimeRange.contains(minTime, maxTime);
7170
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/MultiInputLayer.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.tsfile.enums.TSDataType;
4343
import org.apache.tsfile.read.common.block.TsBlock;
4444
import org.apache.tsfile.read.common.block.TsBlockBuilder;
45-
import org.apache.tsfile.read.common.block.column.TimeColumn;
4645
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
@@ -143,8 +142,8 @@ private YieldableState updateInputColumns() throws Exception {
143142
if (layerReaders[i].isConstantPointReader()) {
144143
inputTVColumnsList[i] = new TVColumns(columns[0]);
145144
} else {
146-
inputTVColumnsList[i] = new TVColumns((TimeColumn) columns[1], columns[0]);
147-
timeHeap.add(((TimeColumn) columns[1]).getStartTime());
145+
inputTVColumnsList[i] = new TVColumns(columns[1], columns[0]);
146+
timeHeap.add(columns[1].getLong(0));
148147
}
149148

150149
currentConsumedIndexes[i] = 0;
@@ -354,10 +353,10 @@ protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
354353
private long currentEndTime = Long.MAX_VALUE;
355354

356355
private final RowListForwardIterator beginIterator = rowRecordList.constructIterator();
357-
private TimeColumn cachedBeginTimeColumn;
356+
private Column cachedBeginTimeColumn;
358357
private int cachedBeginConsumed;
359358

360-
private TimeColumn cachedEndTimeColumn;
359+
private Column cachedEndTimeColumn;
361360
private int cachedEndConsumed;
362361

363362
@Override
@@ -370,7 +369,7 @@ public YieldableState yield() throws Exception {
370369
}
371370

372371
Column[] columns = udfInputDataSet.currentBlock();
373-
TimeColumn times = (TimeColumn) columns[columns.length - 1];
372+
Column times = columns[columns.length - 1];
374373

375374
rowRecordList.put(columns);
376375

@@ -379,11 +378,12 @@ public YieldableState yield() throws Exception {
379378
if (nextWindowTimeBegin == Long.MIN_VALUE) {
380379
// display window begin should be set to the same as the min timestamp of the query
381380
// result set
382-
nextWindowTimeBegin = cachedEndTimeColumn.getStartTime();
381+
nextWindowTimeBegin = cachedEndTimeColumn.getLong(0);
383382
}
384383
hasAtLeastOneRow = rowRecordList.size() != 0;
385384
if (hasAtLeastOneRow) {
386-
currentEndTime = cachedEndTimeColumn.getEndTime();
385+
currentEndTime =
386+
cachedEndTimeColumn.getLong(cachedEndTimeColumn.getPositionCount() - 1);
387387
}
388388
isFirstIteration = false;
389389
}
@@ -407,10 +407,10 @@ public YieldableState yield() throws Exception {
407407
}
408408
// Generate data
409409
Column[] columns = udfInputDataSet.currentBlock();
410-
TimeColumn times = (TimeColumn) columns[columns.length - 1];
410+
Column times = columns[columns.length - 1];
411411
// Put data into container
412412
rowRecordList.put(columns);
413-
currentEndTime = times.getEndTime();
413+
currentEndTime = times.getLong(times.getPositionCount() - 1);
414414
// Increase nextIndexEnd
415415
nextIndexEnd += cachedEndTimeColumn.getPositionCount() - cachedEndConsumed;
416416
// Update cache
@@ -446,7 +446,7 @@ public YieldableState yield() throws Exception {
446446

447447
cachedBeginConsumed = 0;
448448
Column[] columns = beginIterator.currentBlock();
449-
cachedBeginTimeColumn = (TimeColumn) columns[columns.length - 1];
449+
cachedBeginTimeColumn = columns[columns.length - 1];
450450
} else {
451451
// No more data
452452
// Set nextIndexBegin to list's size
@@ -456,7 +456,8 @@ public YieldableState yield() throws Exception {
456456
}
457457

458458
if ((nextIndexEnd == nextIndexBegin)
459-
&& nextWindowTimeEnd < cachedEndTimeColumn.getEndTime()) {
459+
&& nextWindowTimeEnd
460+
< cachedEndTimeColumn.getLong(cachedEndTimeColumn.getPositionCount() - 1)) {
460461
window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
461462
return YieldableState.YIELDABLE;
462463
}
@@ -514,7 +515,7 @@ protected LayerRowWindowReader constructRowSessionTimeWindowReader(
514515
private int nextIndexBegin = 0;
515516
private int nextIndexEnd = 0;
516517

517-
private TimeColumn cachedTimes;
518+
private Column cachedTimes;
518519
private int cachedConsumed;
519520

520521
@Override
@@ -557,7 +558,7 @@ public YieldableState yield() throws Exception {
557558
}
558559

559560
if (!findWindow) {
560-
if (cachedTimes.getEndTime() < displayWindowEnd) {
561+
if (cachedTimes.getLong(cachedTimes.getPositionCount() - 1) < displayWindowEnd) {
561562
YieldableState state = yieldAndCache();
562563
if (state == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
563564
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
@@ -586,12 +587,12 @@ private YieldableState yieldInFirstIteration() throws Exception {
586587
}
587588
}
588589
// Initialize essential information
589-
nextWindowTimeBegin = Math.max(displayWindowBegin, cachedTimes.getStartTime());
590+
nextWindowTimeBegin = Math.max(displayWindowBegin, cachedTimes.getLong(0));
590591
hasAtLeastOneRow = rowRecordList.size() != 0;
591592
isFirstIteration = false;
592593

593594
// Set initial nextIndexBegin
594-
long currentEndTime = cachedTimes.getEndTime();
595+
long currentEndTime = cachedTimes.getLong(cachedTimes.getPositionCount() - 1);
595596
// Find corresponding block
596597
while (currentEndTime < nextWindowTimeBegin) {
597598
// Consume all data
@@ -625,7 +626,7 @@ private YieldableState yieldAndCache() throws Exception {
625626
return state;
626627
}
627628
Column[] columns = udfInputDataSet.currentBlock();
628-
TimeColumn times = (TimeColumn) columns[columns.length - 1];
629+
Column times = columns[columns.length - 1];
629630

630631
rowRecordList.put(columns);
631632

0 commit comments

Comments
 (0)