Skip to content

Commit 2951f5f

Browse files
committed
[CALCITE-7410] TIMESTAMP type for TUMBLE and HOP is hardwired to TIMESTAMP(3)
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
1 parent df31d6f commit 2951f5f

File tree

5 files changed

+208
-138
lines changed

5 files changed

+208
-138
lines changed

core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040

4141
import static org.apache.calcite.util.Static.RESOURCE;
4242

43+
import static java.util.Objects.requireNonNull;
44+
4345
/**
4446
* Base class for a table-valued function that computes windows. Examples
4547
* include {@code TUMBLE}, {@code HOP} and {@code SESSION}.
@@ -107,14 +109,37 @@ public SqlWindowTableFunction(String name, SqlOperandMetadata operandMetadata) {
107109
private static RelDataType inferRowType(SqlOperatorBinding opBinding) {
108110
final RelDataType inputRowType = opBinding.getOperandType(0);
109111
final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
112+
int precision = getTimestampPrecision(opBinding);
110113
return typeFactory.builder()
111114
.kind(inputRowType.getStructKind())
112115
.addAll(inputRowType.getFieldList())
113-
.add("window_start", SqlTypeName.TIMESTAMP, 3)
114-
.add("window_end", SqlTypeName.TIMESTAMP, 3)
116+
.add("window_start", SqlTypeName.TIMESTAMP, precision)
117+
.add("window_end", SqlTypeName.TIMESTAMP, precision)
115118
.build();
116119
}
117120

121+
/** Extract the precision for the start_window, end_window columns from the column supplied as
122+
* DESCRIPTOR for the window function. */
123+
private static int getTimestampPrecision(SqlOperatorBinding opBinding) {
124+
RelDataType inputRowType = opBinding.getOperandType(0);
125+
SqlCallBinding callBinding = (SqlCallBinding) opBinding;
126+
// Locate the "descriptor" argument
127+
for (SqlNode operand : callBinding.operands()) {
128+
if (operand instanceof SqlCall) {
129+
SqlCall opCall = (SqlCall) operand;
130+
if (opCall.getOperator().getKind() == SqlKind.DESCRIPTOR) {
131+
SqlNode descriptor = opCall.operand(0);
132+
SqlIdentifier id = (SqlIdentifier) descriptor;
133+
RelDataTypeField field =
134+
inputRowType.getField(id.getSimple(), false, false);
135+
return requireNonNull(field, "field").getType().getPrecision();
136+
}
137+
}
138+
}
139+
// Should be unreachable, since validation succeeded
140+
throw new RuntimeException("Could not locate DESCRIPTOR column");
141+
}
142+
118143
/** Partial implementation of operand type checker. */
119144
protected abstract static class AbstractOperandMetadata
120145
implements SqlOperandMetadata {

core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2502,6 +2502,13 @@ void checkCorrelatedMapSubQuery(boolean expand) {
25022502
sql(sql).ok();
25032503
}
25042504

2505+
@Test void testTableFunctionTumbleConvert() {
2506+
final String sql = "with t as (select CAST(rowtime AS TIMESTAMP(2)) as rowtime FROM Shipments) "
2507+
+ "select *\n"
2508+
+ "from table(tumble(table t, descriptor(rowtime), INTERVAL '1.5' SECOND))";
2509+
sql(sql).ok();
2510+
}
2511+
25052512
@Test void testTableFunctionTumbleWithParamNames() {
25062513
final String sql = "select *\n"
25072514
+ "from table(\n"

core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8404,7 +8404,7 @@ from table(hop(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE, INTERV
84048404
<Resource name="plan">
84058405
<![CDATA[
84068406
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8407-
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8407+
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
84088408
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
84098409
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
84108410
]]>
@@ -8418,7 +8418,7 @@ from table(hop(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE, INTERV
84188418
<Resource name="plan">
84198419
<![CDATA[
84208420
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8421-
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 300000:INTERVAL MINUTE, 180000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8421+
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 300000:INTERVAL MINUTE, 180000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
84228422
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
84238423
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
84248424
]]>
@@ -8437,7 +8437,7 @@ hop(
84378437
<Resource name="plan">
84388438
<![CDATA[
84398439
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8440-
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8440+
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
84418441
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
84428442
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
84438443
]]>
@@ -8456,7 +8456,7 @@ hop(
84568456
<Resource name="plan">
84578457
<![CDATA[
84588458
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8459-
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8459+
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
84608460
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
84618461
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
84628462
]]>
@@ -8470,7 +8470,7 @@ from table(hop((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINU
84708470
<Resource name="plan">
84718471
<![CDATA[
84728472
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8473-
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8473+
LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
84748474
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
84758475
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
84768476
]]>
@@ -8484,7 +8484,7 @@ from table(session(table Shipments, descriptor(rowtime), descriptor(orderId), IN
84848484
<Resource name="plan">
84858485
<![CDATA[
84868486
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8487-
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8487+
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
84888488
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
84898489
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
84908490
]]>
@@ -8498,7 +8498,7 @@ from table(session(table Orders, descriptor(rowtime), descriptor(orderId, produc
84988498
<Resource name="plan">
84998499
<![CDATA[
85008500
LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2], window_start=[$3], window_end=[$4])
8501-
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($0), DESCRIPTOR($2, $1), 600000:INTERVAL MINUTE)], rowType=[RecordType(TIMESTAMP(0) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8501+
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($0), DESCRIPTOR($2, $1), 600000:INTERVAL MINUTE)], rowType=[RecordType(TIMESTAMP(0) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
85028502
LogicalProject(ROWTIME=[$0], PRODUCTID=[$1], ORDERID=[$2])
85038503
LogicalTableScan(table=[[CATALOG, SALES, ORDERS]])
85048504
]]>
@@ -8517,7 +8517,7 @@ session(
85178517
<Resource name="plan">
85188518
<![CDATA[
85198519
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8520-
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8520+
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
85218521
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
85228522
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
85238523
]]>
@@ -8536,7 +8536,7 @@ session(
85368536
<Resource name="plan">
85378537
<![CDATA[
85388538
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8539-
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8539+
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
85408540
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
85418541
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
85428542
]]>
@@ -8550,7 +8550,7 @@ from table(session((select * from Shipments), descriptor(rowtime), descriptor(or
85508550
<Resource name="plan">
85518551
<![CDATA[
85528552
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8553-
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8553+
LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
85548554
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
85558555
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
85568556
]]>
@@ -8585,9 +8585,23 @@ from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE))]]>
85858585
<Resource name="plan">
85868586
<![CDATA[
85878587
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8588-
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8588+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
85898589
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
85908590
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
8591+
]]>
8592+
</Resource>
8593+
</TestCase>
8594+
<TestCase name="testTableFunctionTumbleConvert">
8595+
<Resource name="sql">
8596+
<![CDATA[with t as (select CAST(rowtime AS TIMESTAMP(2)) as rowtime FROM Shipments) select *
8597+
from table(tumble(table t, descriptor(rowtime), INTERVAL '1.5' SECOND))]]>
8598+
</Resource>
8599+
<Resource name="plan">
8600+
<![CDATA[
8601+
LogicalProject(ROWTIME=[$0], window_start=[$1], window_end=[$2])
8602+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($0), 1500:INTERVAL SECOND)], rowType=[RecordType(TIMESTAMP(2) ROWTIME, TIMESTAMP(2) window_start, TIMESTAMP(2) window_end)])
8603+
LogicalProject(ROWTIME=[CAST($1):TIMESTAMP(2) NOT NULL])
8604+
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
85918605
]]>
85928606
</Resource>
85938607
</TestCase>
@@ -8602,10 +8616,10 @@ on a.orderid = b.orderid]]>
86028616
<![CDATA[
86038617
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3], ORDERID0=[$4], ROWTIME0=[$5], window_start0=[$6], window_end0=[$7])
86048618
LogicalJoin(condition=[=($0, $4)], joinType=[inner])
8605-
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8619+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
86068620
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
86078621
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
8608-
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8622+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
86098623
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
86108624
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
86118625
]]>
@@ -8620,7 +8634,7 @@ from table(tumble(table Shipments, descriptor(rowtime),
86208634
<Resource name="plan">
86218635
<![CDATA[
86228636
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8623-
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 600000:INTERVAL MINUTE, 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8637+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 600000:INTERVAL MINUTE, 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
86248638
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
86258639
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
86268640
]]>
@@ -8638,7 +8652,7 @@ tumble(
86388652
<Resource name="plan">
86398653
<![CDATA[
86408654
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8641-
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8655+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
86428656
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
86438657
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
86448658
]]>
@@ -8656,7 +8670,7 @@ tumble(
86568670
<Resource name="plan">
86578671
<![CDATA[
86588672
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8659-
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8673+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
86608674
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
86618675
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
86628676
]]>
@@ -8670,7 +8684,7 @@ from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' M
86708684
<Resource name="plan">
86718685
<![CDATA[
86728686
LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3])
8673-
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end)])
8687+
LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)])
86748688
LogicalProject(ORDERID=[$0], ROWTIME=[$1])
86758689
LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]])
86768690
]]>

0 commit comments

Comments
 (0)