Skip to content

Commit 3f96ada

Browse files
authored
Tumble & Cumulate Windows TVFs (apache#15354)
Tumble & Cumulate Windows TVFs
1 parent 66b1997 commit 3f96ada

File tree

4 files changed

+393
-0
lines changed

4 files changed

+393
-0
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.sql.Connection;
3434
import java.sql.Statement;
3535

36+
import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
3637
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
3738
import static org.junit.Assert.fail;
3839

@@ -226,4 +227,108 @@ public void testCapacityFunction() {
226227
retArray,
227228
DATABASE_NAME);
228229
}
230+
231+
@Test
232+
public void testTumbleFunction() {
233+
// TUMBLE (10m)
234+
String[] expectedHeader =
235+
new String[] {"window_start", "window_end", "time", "stock_id", "price", "s1"};
236+
String[] retArray =
237+
new String[] {
238+
"2021-01-01T09:00:00.000Z,2021-01-01T09:10:00.000Z,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
239+
"2021-01-01T09:00:00.000Z,2021-01-01T09:10:00.000Z,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
240+
"2021-01-01T09:00:00.000Z,2021-01-01T09:10:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
241+
"2021-01-01T09:00:00.000Z,2021-01-01T09:10:00.000Z,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
242+
"2021-01-01T09:00:00.000Z,2021-01-01T09:10:00.000Z,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
243+
"2021-01-01T09:10:00.000Z,2021-01-01T09:20:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
244+
};
245+
tableResultSetEqualTest(
246+
"SELECT * FROM TUMBLE(DATA => bid, TIMECOL => 'time', SIZE => 10m) ORDER BY stock_id, time",
247+
expectedHeader,
248+
retArray,
249+
DATABASE_NAME);
250+
251+
// TUMBLE (10m) + GROUP BY
252+
expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"};
253+
retArray =
254+
new String[] {
255+
"2021-01-01T09:00:00.000Z,2021-01-01T09:10:00.000Z,AAPL,305.0,",
256+
"2021-01-01T09:00:00.000Z,2021-01-01T09:10:00.000Z,TESL,402.0,",
257+
"2021-01-01T09:10:00.000Z,2021-01-01T09:20:00.000Z,TESL,195.0,",
258+
};
259+
tableResultSetEqualTest(
260+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM TUMBLE(DATA => bid, TIMECOL => 'time', SIZE => 10m) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
261+
expectedHeader,
262+
retArray,
263+
DATABASE_NAME);
264+
265+
// TUMBLE (1h) + GROUP BY
266+
expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"};
267+
retArray =
268+
new String[] {
269+
"2021-01-01T09:00:00.000Z,2021-01-01T10:00:00.000Z,AAPL,305.0,",
270+
"2021-01-01T09:00:00.000Z,2021-01-01T10:00:00.000Z,TESL,597.0,",
271+
};
272+
tableResultSetEqualTest(
273+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM TUMBLE(DATA => bid, TIMECOL => 'time', SIZE => 1h) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
274+
expectedHeader,
275+
retArray,
276+
DATABASE_NAME);
277+
}
278+
279+
@Test
280+
public void testCumulateFunction() {
281+
String[] expectedHeader =
282+
new String[] {"window_start", "window_end", "time", "stock_id", "price", "s1"};
283+
String[] retArray =
284+
new String[] {
285+
"2021-01-01T09:00:00.000Z,2021-01-01T09:06:00.000Z,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
286+
"2021-01-01T09:00:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
287+
"2021-01-01T09:00:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
288+
"2021-01-01T09:00:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
289+
"2021-01-01T09:00:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
290+
"2021-01-01T09:00:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
291+
"2021-01-01T09:12:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
292+
"2021-01-01T09:12:00.000Z,2021-01-01T09:24:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
293+
};
294+
tableResultSetEqualTest(
295+
"SELECT * FROM CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m) ORDER BY stock_id, time",
296+
expectedHeader,
297+
retArray,
298+
DATABASE_NAME);
299+
300+
expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"};
301+
retArray =
302+
new String[] {
303+
"2021-01-01T09:00:00.000Z,2021-01-01T09:06:00.000Z,AAPL,100.0,",
304+
"2021-01-01T09:00:00.000Z,2021-01-01T09:12:00.000Z,AAPL,305.0,",
305+
"2021-01-01T09:00:00.000Z,2021-01-01T09:12:00.000Z,TESL,402.0,",
306+
"2021-01-01T09:12:00.000Z,2021-01-01T09:18:00.000Z,TESL,195.0,",
307+
"2021-01-01T09:12:00.000Z,2021-01-01T09:24:00.000Z,TESL,195.0,",
308+
};
309+
tableResultSetEqualTest(
310+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
311+
expectedHeader,
312+
retArray,
313+
DATABASE_NAME);
314+
315+
expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"};
316+
retArray =
317+
new String[] {
318+
"2021-01-01T09:00:00.000Z,2021-01-01T10:00:00.000Z,AAPL,305.0,",
319+
"2021-01-01T09:00:00.000Z,2021-01-01T10:00:00.000Z,TESL,597.0,",
320+
};
321+
tableResultSetEqualTest(
322+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 1h, SIZE => 1h) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
323+
expectedHeader,
324+
retArray,
325+
DATABASE_NAME);
326+
327+
// test UDFException
328+
String errMsg = "Cumulative table function requires size must be an integral multiple of step.";
329+
tableAssertTestFail(
330+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 4m, SIZE => 10m) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
331+
errMsg,
332+
DATABASE_NAME);
333+
}
229334
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
package org.apache.iotdb.commons.udf.builtin.relational;
2121

2222
import org.apache.iotdb.commons.udf.builtin.relational.tvf.CapacityTableFunction;
23+
import org.apache.iotdb.commons.udf.builtin.relational.tvf.CumulateTableFunction;
2324
import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction;
2425
import org.apache.iotdb.commons.udf.builtin.relational.tvf.SessionTableFunction;
26+
import org.apache.iotdb.commons.udf.builtin.relational.tvf.TumbleTableFunction;
2527
import org.apache.iotdb.commons.udf.builtin.relational.tvf.VariationTableFunction;
2628
import org.apache.iotdb.udf.api.relational.TableFunction;
2729

@@ -31,7 +33,9 @@
3133
import java.util.stream.Collectors;
3234

3335
public enum TableBuiltinTableFunction {
36+
TUMBLE("tumble"),
3437
HOP("hop"),
38+
CUMULATE("cumulate"),
3539
SESSION("session"),
3640
VARIATION("variation"),
3741
CAPACITY("capacity");
@@ -62,8 +66,12 @@ public static boolean isBuiltInTableFunction(String functionName) {
6266

6367
public static TableFunction getBuiltinTableFunction(String functionName) {
6468
switch (functionName.toLowerCase()) {
69+
case "tumble":
70+
return new TumbleTableFunction();
6571
case "hop":
6672
return new HOPTableFunction();
73+
case "cumulate":
74+
return new CumulateTableFunction();
6775
case "session":
6876
return new SessionTableFunction();
6977
case "variation":
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.udf.builtin.relational.tvf;
21+
22+
import org.apache.iotdb.udf.api.exception.UDFException;
23+
import org.apache.iotdb.udf.api.relational.TableFunction;
24+
import org.apache.iotdb.udf.api.relational.access.Record;
25+
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
26+
import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
27+
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
28+
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
29+
import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
30+
import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
31+
import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
32+
import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
33+
import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
34+
import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
35+
import org.apache.iotdb.udf.api.type.Type;
36+
37+
import org.apache.tsfile.block.column.ColumnBuilder;
38+
39+
import java.util.Arrays;
40+
import java.util.Collections;
41+
import java.util.List;
42+
import java.util.Map;
43+
44+
import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
45+
46+
public class CumulateTableFunction implements TableFunction {
47+
48+
private static final String DATA_PARAMETER_NAME = "DATA";
49+
private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
50+
private static final String SIZE_PARAMETER_NAME = "SIZE";
51+
private static final String STEP_PARAMETER_NAME = "STEP";
52+
private static final String ORIGIN_PARAMETER_NAME = "ORIGIN";
53+
54+
@Override
55+
public List<ParameterSpecification> getArgumentsSpecifications() {
56+
return Arrays.asList(
57+
TableParameterSpecification.builder()
58+
.name(DATA_PARAMETER_NAME)
59+
.rowSemantics()
60+
.passThroughColumns()
61+
.build(),
62+
ScalarParameterSpecification.builder()
63+
.name(TIMECOL_PARAMETER_NAME)
64+
.type(Type.STRING)
65+
.defaultValue("time")
66+
.build(),
67+
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
68+
ScalarParameterSpecification.builder().name(STEP_PARAMETER_NAME).type(Type.INT64).build(),
69+
ScalarParameterSpecification.builder()
70+
.name(ORIGIN_PARAMETER_NAME)
71+
.type(Type.TIMESTAMP)
72+
.defaultValue(0L)
73+
.build());
74+
}
75+
76+
@Override
77+
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException {
78+
// size must be an integral multiple of step.
79+
long size = (long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue();
80+
long step = (long) ((ScalarArgument) arguments.get(STEP_PARAMETER_NAME)).getValue();
81+
82+
if (size % step != 0) {
83+
throw new UDFException(
84+
"Cumulative table function requires size must be an integral multiple of step.");
85+
}
86+
87+
TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME);
88+
String expectedFieldName =
89+
(String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
90+
int requiredIndex =
91+
findColumnIndex(tableArgument, expectedFieldName, Collections.singleton(Type.TIMESTAMP));
92+
DescribedSchema properColumnSchema =
93+
new DescribedSchema.Builder()
94+
.addField("window_start", Type.TIMESTAMP)
95+
.addField("window_end", Type.TIMESTAMP)
96+
.build();
97+
98+
// outputColumnSchema
99+
return TableFunctionAnalysis.builder()
100+
.properColumnSchema(properColumnSchema)
101+
.requireRecordSnapshot(false)
102+
.requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex))
103+
.build();
104+
}
105+
106+
@Override
107+
public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) {
108+
return new TableFunctionProcessorProvider() {
109+
@Override
110+
public TableFunctionDataProcessor getDataProcessor() {
111+
return new CumulateDataProcessor(
112+
(Long) ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue(),
113+
(Long) ((ScalarArgument) arguments.get(STEP_PARAMETER_NAME)).getValue(),
114+
(Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue());
115+
}
116+
};
117+
}
118+
119+
private static class CumulateDataProcessor implements TableFunctionDataProcessor {
120+
121+
private final long step;
122+
private final long size;
123+
private final long start;
124+
private long curIndex = 0;
125+
126+
public CumulateDataProcessor(long startTime, long step, long size) {
127+
this.step = step;
128+
this.size = size;
129+
this.start = startTime;
130+
}
131+
132+
@Override
133+
public void process(
134+
Record input,
135+
List<ColumnBuilder> properColumnBuilders,
136+
ColumnBuilder passThroughIndexBuilder) {
137+
// find the first windows
138+
long timeValue = input.getLong(0);
139+
long window_start = (timeValue - start) / size * size;
140+
for (long steps = (timeValue - window_start + step) / step * step;
141+
steps <= size;
142+
steps += step) {
143+
properColumnBuilders.get(0).writeLong(window_start);
144+
properColumnBuilders.get(1).writeLong(window_start + steps);
145+
passThroughIndexBuilder.writeLong(curIndex);
146+
}
147+
curIndex++;
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)