Skip to content

Commit 7f916bb

Browse files
authored
Finish window function query planning stage
1 parent 0996da2 commit 7f916bb

File tree

63 files changed

+4799
-139
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+4799
-139
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionIT.java

Lines changed: 413 additions & 0 deletions
Large diffs are not rendered by default.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ private TsBlock getFilterTsBlock(TsBlock input) {
171171
if (!hasNonMappableUDF) {
172172
// get result of calculated common sub expressions
173173
for (ColumnTransformer columnTransformer : commonTransformerList) {
174+
// CASE WHEN clause would clear all its cache
175+
// evaluate again to acquire cache
176+
columnTransformer.tryEvaluate();
174177
resultColumns.add(columnTransformer.getColumn());
175178
}
176179
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
3131

3232
import com.google.common.collect.ImmutableList;
33+
import com.google.common.util.concurrent.ListenableFuture;
3334
import org.apache.tsfile.block.column.Column;
3435
import org.apache.tsfile.common.conf.TSFileDescriptor;
3536
import org.apache.tsfile.enums.TSDataType;
@@ -134,6 +135,11 @@ public OperatorContext getOperatorContext() {
134135
return operatorContext;
135136
}
136137

138+
@Override
139+
public ListenableFuture<?> isBlocked() {
140+
return inputOperator.isBlocked();
141+
}
142+
137143
@Override
138144
public TsBlock next() throws Exception {
139145
long startTime = System.nanoTime();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.operator.process.window.function;
21+
22+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank.CumeDistFunction;
23+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank.DenseRankFunction;
24+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank.NTileFunction;
25+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank.PercentRankFunction;
26+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank.RankFunction;
27+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank.RowNumberFunction;
28+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value.FirstValueFunction;
29+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value.LagFunction;
30+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value.LastValueFunction;
31+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value.LeadFunction;
32+
import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value.NthValueFunction;
33+
34+
import java.util.List;
35+
36+
public class WindowFunctionFactory {
37+
public static WindowFunction createBuiltinWindowFunction(
38+
String functionName, List<Integer> argumentChannels, boolean ignoreNulls) {
39+
switch (functionName) {
40+
case "nth_value":
41+
return new NthValueFunction(argumentChannels, ignoreNulls);
42+
case "first_value":
43+
return new FirstValueFunction(argumentChannels.get(0), ignoreNulls);
44+
case "last_value":
45+
return new LastValueFunction(argumentChannels.get(0), ignoreNulls);
46+
case "lead":
47+
return new LeadFunction(argumentChannels, ignoreNulls);
48+
case "lag":
49+
return new LagFunction(argumentChannels, ignoreNulls);
50+
case "rank":
51+
return new RankFunction();
52+
case "dense_rank":
53+
return new DenseRankFunction();
54+
case "row_number":
55+
return new RowNumberFunction();
56+
case "percent_rank":
57+
return new PercentRankFunction();
58+
case "cume_dist":
59+
return new CumeDistFunction();
60+
case "ntile":
61+
return new NTileFunction(argumentChannels.get(0));
62+
default:
63+
throw new UnsupportedOperationException(
64+
"Unsupported built-in window function name: " + functionName);
65+
}
66+
}
67+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/rank/NTileFunction.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import org.apache.tsfile.block.column.ColumnBuilder;
2525

2626
public class NTileFunction extends RankWindowFunction {
27-
private final int n;
27+
private final int nChannel;
2828

29-
public NTileFunction(int n) {
30-
this.n = n;
29+
public NTileFunction(int nChannel) {
30+
this.nChannel = nChannel;
3131
}
3232

3333
@Override
@@ -37,7 +37,12 @@ public void transform(
3737
int index,
3838
boolean isNewPeerGroup,
3939
int peerGroupCount) {
40-
builder.writeLong(bucket(n, index, partition.getPositionCount()) + 1);
40+
if (partition.isNull(nChannel, index)) {
41+
builder.appendNull();
42+
} else {
43+
long n = partition.getLong(nChannel, index);
44+
builder.writeLong(bucket(n, index, partition.getPositionCount()) + 1);
45+
}
4146
}
4247

4348
private long bucket(long buckets, int index, int count) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LagFunction.java

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,34 @@
2222
import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;
2323

2424
import org.apache.tsfile.block.column.ColumnBuilder;
25+
import org.apache.tsfile.enums.TSDataType;
26+
import org.apache.tsfile.write.UnSupportedDataTypeException;
27+
28+
import java.util.List;
2529

2630
public class LagFunction extends ValueWindowFunction {
2731
private final int channel;
28-
private final Integer offset;
29-
private final Object defaultVal;
32+
private final int offsetChannel;
33+
private final int defaultValChannel;
3034
private final boolean ignoreNull;
3135

32-
public LagFunction(int channel, Integer offset, Object defaultVal, boolean ignoreNull) {
33-
this.channel = channel;
34-
this.offset = offset == null ? 1 : offset;
35-
this.defaultVal = defaultVal;
36+
public LagFunction(List<Integer> argumentChannels, boolean ignoreNull) {
37+
this.channel = argumentChannels.get(0);
38+
this.offsetChannel = argumentChannels.size() > 1 ? argumentChannels.get(1) : -1;
39+
this.defaultValChannel = argumentChannels.size() > 2 ? argumentChannels.get(2) : -1;
3640
this.ignoreNull = ignoreNull;
3741
}
3842

3943
@Override
4044
public void transform(
4145
Partition partition, ColumnBuilder builder, int index, int frameStart, int frameEnd) {
46+
if (offsetChannel >= 0 && partition.isNull(offsetChannel, index)) {
47+
builder.appendNull();
48+
return;
49+
}
50+
51+
int offset = offsetChannel >= 0 ? partition.getInt(offsetChannel, index) : 1;
52+
4253
int pos;
4354
if (ignoreNull) {
4455
int nonNullCount = 0;
@@ -63,13 +74,45 @@ public void transform(
6374
} else {
6475
builder.appendNull();
6576
}
66-
} else if (defaultVal != null) {
67-
builder.writeObject(defaultVal);
77+
} else if (defaultValChannel >= 0) {
78+
writeDefaultValue(partition, defaultValChannel, index, builder);
6879
} else {
6980
builder.appendNull();
7081
}
7182
}
7283

84+
private void writeDefaultValue(
85+
Partition partition, int defaultValChannel, int index, ColumnBuilder builder) {
86+
TSDataType dataType = builder.getDataType();
87+
switch (dataType) {
88+
case INT32:
89+
case DATE:
90+
builder.writeInt(partition.getInt(defaultValChannel, index));
91+
return;
92+
case INT64:
93+
case TIMESTAMP:
94+
builder.writeLong(partition.getLong(defaultValChannel, index));
95+
return;
96+
case FLOAT:
97+
builder.writeFloat(partition.getFloat(defaultValChannel, index));
98+
return;
99+
case DOUBLE:
100+
builder.writeDouble(partition.getDouble(defaultValChannel, index));
101+
return;
102+
case BOOLEAN:
103+
builder.writeBoolean(partition.getBoolean(defaultValChannel, index));
104+
return;
105+
case TEXT:
106+
case STRING:
107+
case BLOB:
108+
builder.writeBinary(partition.getBinary(defaultValChannel, index));
109+
return;
110+
default:
111+
throw new UnSupportedDataTypeException(
112+
"Unsupported default value's data type in Lag: " + dataType);
113+
}
114+
}
115+
73116
@Override
74117
public boolean needFrame() {
75118
return false;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LeadFunction.java

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,34 @@
2222
import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;
2323

2424
import org.apache.tsfile.block.column.ColumnBuilder;
25+
import org.apache.tsfile.enums.TSDataType;
26+
import org.apache.tsfile.write.UnSupportedDataTypeException;
27+
28+
import java.util.List;
2529

2630
public class LeadFunction extends ValueWindowFunction {
2731
private final int channel;
28-
private final Integer offset;
29-
private final Integer defaultVal;
32+
private final int offsetChannel;
33+
private final int defaultValChannel;
3034
private final boolean ignoreNull;
3135

32-
public LeadFunction(int channel, Integer offset, Integer defaultVal, boolean ignoreNull) {
33-
this.channel = channel;
34-
this.offset = offset == null ? 1 : offset;
35-
this.defaultVal = defaultVal;
36+
public LeadFunction(List<Integer> argumentChannels, boolean ignoreNull) {
37+
this.channel = argumentChannels.get(0);
38+
this.offsetChannel = argumentChannels.size() > 1 ? argumentChannels.get(1) : -1;
39+
this.defaultValChannel = argumentChannels.size() > 2 ? argumentChannels.get(2) : -1;
3640
this.ignoreNull = ignoreNull;
3741
}
3842

3943
@Override
4044
public void transform(
4145
Partition partition, ColumnBuilder builder, int index, int frameStart, int frameEnd) {
46+
if (offsetChannel >= 0 && partition.isNull(offsetChannel, index)) {
47+
builder.appendNull();
48+
return;
49+
}
50+
4251
int length = partition.getPositionCount();
52+
int offset = offsetChannel >= 0 ? partition.getInt(offsetChannel, index) : 1;
4353

4454
int pos;
4555
if (ignoreNull) {
@@ -65,13 +75,45 @@ public void transform(
6575
} else {
6676
builder.appendNull();
6777
}
68-
} else if (defaultVal != null) {
69-
builder.writeObject(defaultVal);
78+
} else if (defaultValChannel >= 0) {
79+
writeDefaultValue(partition, defaultValChannel, index, builder);
7080
} else {
7181
builder.appendNull();
7282
}
7383
}
7484

85+
private void writeDefaultValue(
86+
Partition partition, int defaultValChannel, int index, ColumnBuilder builder) {
87+
TSDataType dataType = builder.getDataType();
88+
switch (dataType) {
89+
case INT32:
90+
case DATE:
91+
builder.writeInt(partition.getInt(defaultValChannel, index));
92+
return;
93+
case INT64:
94+
case TIMESTAMP:
95+
builder.writeLong(partition.getLong(defaultValChannel, index));
96+
return;
97+
case FLOAT:
98+
builder.writeFloat(partition.getFloat(defaultValChannel, index));
99+
return;
100+
case DOUBLE:
101+
builder.writeDouble(partition.getDouble(defaultValChannel, index));
102+
return;
103+
case BOOLEAN:
104+
builder.writeBoolean(partition.getBoolean(defaultValChannel, index));
105+
return;
106+
case TEXT:
107+
case STRING:
108+
case BLOB:
109+
builder.writeBinary(partition.getBinary(defaultValChannel, index));
110+
return;
111+
default:
112+
throw new UnSupportedDataTypeException(
113+
"Unsupported default value's data type in Lag: " + dataType);
114+
}
115+
}
116+
75117
@Override
76118
public boolean needFrame() {
77119
return false;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/NthValueFunction.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,33 +23,36 @@
2323

2424
import org.apache.tsfile.block.column.ColumnBuilder;
2525

26+
import java.util.List;
27+
2628
public class NthValueFunction extends ValueWindowFunction {
27-
private final int n;
28-
private final int channel;
29+
private final int valueChannel;
30+
private final int nChannel;
2931
private final boolean ignoreNull;
3032

31-
public NthValueFunction(int n, int channel, boolean ignoreNull) {
32-
this.n = n;
33-
this.channel = channel;
33+
public NthValueFunction(List<Integer> argumentChannels, boolean ignoreNull) {
34+
this.valueChannel = argumentChannels.get(0);
35+
this.nChannel = argumentChannels.get(1);
3436
this.ignoreNull = ignoreNull;
3537
}
3638

3739
@Override
3840
public void transform(
3941
Partition partition, ColumnBuilder builder, int index, int frameStart, int frameEnd) {
4042
// Empty frame
41-
if (frameStart < 0) {
43+
if (frameStart < 0 || partition.isNull(nChannel, index)) {
4244
builder.appendNull();
4345
return;
4446
}
4547

4648
int pos;
49+
int n = partition.getInt(nChannel, index);
4750
if (ignoreNull) {
4851
// Handle nulls
4952
pos = frameStart;
5053
int nonNullCount = 0;
5154
while (pos <= frameEnd) {
52-
if (!partition.isNull(channel, pos)) {
55+
if (!partition.isNull(valueChannel, pos)) {
5356
nonNullCount++;
5457
if (nonNullCount == n) {
5558
break;
@@ -59,7 +62,7 @@ public void transform(
5962
}
6063

6164
if (pos <= frameEnd) {
62-
partition.writeTo(builder, channel, pos);
65+
partition.writeTo(builder, valueChannel, pos);
6366
} else {
6467
builder.appendNull();
6568
}
@@ -69,8 +72,8 @@ public void transform(
6972
// n starts with 1
7073
pos = frameStart + n - 1;
7174
if (pos <= frameEnd) {
72-
if (!partition.isNull(channel, pos)) {
73-
partition.writeTo(builder, channel, pos);
75+
if (!partition.isNull(valueChannel, pos)) {
76+
partition.writeTo(builder, valueChannel, pos);
7477
} else {
7578
builder.appendNull();
7679
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public PartitionExecutor(
111111
frame = new RangeFrame(partition, frameInfo, sortedColumns, peerGroupComparator);
112112
break;
113113
case ROWS:
114-
frame = new RowsFrame(partition, frameInfo, partitionStart, partitionEnd);
114+
frame = new RowsFrame(partition, frameInfo);
115115
break;
116116
case GROUPS:
117117
frame =

0 commit comments

Comments
 (0)