Skip to content

Commit 89c1b86

Browse files
committed
Add tests for spark sql verification
Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent 5e4f6bc commit 89c1b86

File tree

2 files changed

+445
-0
lines changed

2 files changed

+445
-0
lines changed
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ppl.calcite;
7+
8+
import org.apache.calcite.rel.RelNode;
9+
import org.apache.calcite.test.CalciteAssert;
10+
import org.junit.Test;
11+
12+
/**
13+
* Test suite for PPL queries with dynamic fields in permissive mode. Tests Calcite PPL to Spark SQL
14+
* conversion for queries that access dynamic/unmapped fields via the _MAP column.
15+
*/
16+
public class CalcitePPLDynamicFieldsTest extends CalcitePPLPermissiveAbstractTest {
17+
18+
public CalcitePPLDynamicFieldsTest() {
19+
super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL);
20+
}
21+
22+
@Test
23+
public void testScanTableWithDynamicFields() {
24+
String ppl = "source=test_dynamic";
25+
RelNode root = getRelNode(ppl);
26+
String expectedLogical = "LogicalTableScan(table=[[scott, test_dynamic]])\n";
27+
verifyLogical(root, expectedLogical);
28+
29+
String expectedSparkSql = "SELECT *\n" + "FROM `scott`.`test_dynamic`";
30+
verifyPPLToSparkSQL(root, expectedSparkSql);
31+
}
32+
33+
@Test
34+
public void testProjectStaticFields() {
35+
String ppl = "source=test_dynamic | fields id, name";
36+
RelNode root = getRelNode(ppl);
37+
String expectedLogical =
38+
"LogicalProject(id=[$0], name=[$1])\n"
39+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
40+
verifyLogical(root, expectedLogical);
41+
42+
String expectedSparkSql = "SELECT `id`, `name`\n" + "FROM `scott`.`test_dynamic`";
43+
verifyPPLToSparkSQL(root, expectedSparkSql);
44+
}
45+
46+
@Test
47+
public void testProjectDynamicField() {
48+
String ppl = "source=test_dynamic | fields id, status";
49+
RelNode root = getRelNode(ppl);
50+
String expectedLogical =
51+
"LogicalProject(id=[$0], status=[ITEM($2, 'status')])\n"
52+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
53+
verifyLogical(root, expectedLogical);
54+
55+
String expectedSparkSql =
56+
"SELECT `id`, `_MAP`['status'] `status`\n" + "FROM `scott`.`test_dynamic`";
57+
verifyPPLToSparkSQL(root, expectedSparkSql);
58+
}
59+
60+
@Test
61+
public void testProjectMixedFields() {
62+
String ppl = "source=test_dynamic | fields id, name, status, category";
63+
RelNode root = getRelNode(ppl);
64+
String expectedLogical =
65+
"LogicalProject(id=[$0], name=[$1], status=[ITEM($2, 'status')], category=[ITEM($2,"
66+
+ " 'category')])\n"
67+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
68+
verifyLogical(root, expectedLogical);
69+
70+
String expectedSparkSql =
71+
"SELECT `id`, `name`, `_MAP`['status'] `status`, `_MAP`['category'] `category`\n"
72+
+ "FROM `scott`.`test_dynamic`";
73+
verifyPPLToSparkSQL(root, expectedSparkSql);
74+
}
75+
76+
@Test
77+
public void testFilterOnDynamicField() {
78+
String ppl = "source=test_dynamic | where status = 'active'";
79+
RelNode root = getRelNode(ppl);
80+
String expectedLogical =
81+
"LogicalFilter(condition=[=(ITEM($2, 'status'), 'active':VARCHAR)])\n"
82+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
83+
verifyLogical(root, expectedLogical);
84+
85+
String expectedSparkSql =
86+
"SELECT *\n" + "FROM `scott`.`test_dynamic`\n" + "WHERE `_MAP`['status'] = 'active'";
87+
verifyPPLToSparkSQL(root, expectedSparkSql);
88+
}
89+
90+
@Test
91+
public void testFilterOnMixedFields() {
92+
String ppl = "source=test_dynamic | where id > 10 AND status = 'active'";
93+
RelNode root = getRelNode(ppl);
94+
String expectedLogical =
95+
"LogicalFilter(condition=[AND(>($0, 10), =(ITEM($2, 'status'), 'active':VARCHAR))])\n"
96+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
97+
verifyLogical(root, expectedLogical);
98+
99+
String expectedSparkSql =
100+
"SELECT *\n"
101+
+ "FROM `scott`.`test_dynamic`\n"
102+
+ "WHERE `id` > 10 AND `_MAP`['status'] = 'active'";
103+
verifyPPLToSparkSQL(root, expectedSparkSql);
104+
}
105+
106+
@Test
107+
public void testSortByDynamicField() {
108+
String ppl = "source=test_dynamic | sort status";
109+
RelNode root = getRelNode(ppl);
110+
String expectedLogical =
111+
"LogicalProject(id=[$0], name=[$1], _MAP=[$2])\n"
112+
+ " LogicalSort(sort0=[$3], dir0=[ASC-nulls-first])\n"
113+
+ " LogicalProject(id=[$0], name=[$1], _MAP=[$2], $f3=[ITEM($2, 'status')])\n"
114+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
115+
verifyLogical(root, expectedLogical);
116+
117+
String expectedSparkSql =
118+
"SELECT `id`, `name`, `_MAP`\n"
119+
+ "FROM (SELECT `id`, `name`, `_MAP`, `_MAP`['status'] `$f3`\n"
120+
+ "FROM `scott`.`test_dynamic`\n"
121+
+ "ORDER BY 4) `t0`";
122+
verifyPPLToSparkSQL(root, expectedSparkSql);
123+
}
124+
125+
@Test
126+
public void testAggregationWithDynamicField() {
127+
String ppl = "source=test_dynamic | stats count() by status";
128+
RelNode root = getRelNode(ppl);
129+
String expectedLogical =
130+
"LogicalProject(count()=[$1], status=[$0])\n"
131+
+ " LogicalAggregate(group=[{0}], count()=[COUNT()])\n"
132+
+ " LogicalProject(status=[ITEM($2, 'status')])\n"
133+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
134+
verifyLogical(root, expectedLogical);
135+
136+
String expectedSparkSql =
137+
"SELECT COUNT(*) `count()`, `_MAP`['status'] `status`\n"
138+
+ "FROM `scott`.`test_dynamic`\n"
139+
+ "GROUP BY `_MAP`['status']";
140+
verifyPPLToSparkSQL(root, expectedSparkSql);
141+
}
142+
143+
@Test
144+
public void testTimechartWithDynamicTimestamp() {
145+
String ppl = "source=test_dynamic | timechart count()";
146+
RelNode root = getRelNode(ppl);
147+
String expectedLogical =
148+
"LogicalSort(sort0=[$0], dir0=[ASC])\n"
149+
+ " LogicalProject(@timestamp=[$0], count=[$1])\n"
150+
+ " LogicalAggregate(group=[{0}], count=[COUNT()])\n"
151+
+ " LogicalProject(timestamp=[SPAN(SAFE_CAST(ITEM($2, '@timestamp')), 1, 'm')])\n"
152+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
153+
verifyLogical(root, expectedLogical);
154+
155+
String expectedSparkSql =
156+
"SELECT `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'm') `@timestamp`, COUNT(*)"
157+
+ " `count`\n"
158+
+ "FROM `scott`.`test_dynamic`\n"
159+
+ "GROUP BY `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'm')\n"
160+
+ "ORDER BY 1 NULLS LAST";
161+
verifyPPLToSparkSQL(root, expectedSparkSql);
162+
}
163+
164+
@Test
165+
public void testTimechartWithDynamicFieldGroupBy() {
166+
String ppl = "source=test_dynamic | timechart span=1h count() by status";
167+
RelNode root = getRelNode(ppl);
168+
String expectedSparkSql =
169+
"SELECT `@timestamp`, `status`, SUM(`actual_count`) `count`\n"
170+
+ "FROM (SELECT CAST(`t1`.`@timestamp` AS TIMESTAMP) `@timestamp`, CASE WHEN"
171+
+ " `t7`.`status` IS NOT NULL THEN `t1`.`status` ELSE CASE WHEN `t1`.`status` IS NULL"
172+
+ " THEN NULL ELSE 'OTHER' END END `status`, SUM(`t1`.`$f2_0`) `actual_count`\n"
173+
+ "FROM (SELECT `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h') `@timestamp`,"
174+
+ " SAFE_CAST(`_MAP`['status'] AS STRING) `status`, COUNT(*) `$f2_0`\n"
175+
+ "FROM `scott`.`test_dynamic`\n"
176+
+ "GROUP BY SAFE_CAST(`_MAP`['status'] AS STRING),"
177+
+ " `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h')) `t1`\n"
178+
+ "LEFT JOIN (SELECT `status`, SUM(`$f2_0`) `grand_total`\n"
179+
+ "FROM (SELECT `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h') `@timestamp`,"
180+
+ " SAFE_CAST(`_MAP`['status'] AS STRING) `status`, COUNT(*) `$f2_0`\n"
181+
+ "FROM `scott`.`test_dynamic`\n"
182+
+ "GROUP BY SAFE_CAST(`_MAP`['status'] AS STRING),"
183+
+ " `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h')) `t4`\n"
184+
+ "WHERE `status` IS NOT NULL\n"
185+
+ "GROUP BY `status`\n"
186+
+ "ORDER BY 2 DESC NULLS FIRST\n"
187+
+ "LIMIT 10) `t7` ON `t1`.`status` IS NOT DISTINCT FROM `t7`.`status`\n"
188+
+ "GROUP BY CAST(`t1`.`@timestamp` AS TIMESTAMP), CASE WHEN `t7`.`status` IS NOT NULL"
189+
+ " THEN `t1`.`status` ELSE CASE WHEN `t1`.`status` IS NULL THEN NULL ELSE 'OTHER' END"
190+
+ " END\n"
191+
+ "UNION\n"
192+
+ "SELECT CAST(`t13`.`@timestamp` AS TIMESTAMP) `@timestamp`, `t24`.`$f0` `status`, 0"
193+
+ " `count`\n"
194+
+ "FROM (SELECT `@timestamp`\n"
195+
+ "FROM (SELECT `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h')"
196+
+ " `@timestamp`\n"
197+
+ "FROM `scott`.`test_dynamic`\n"
198+
+ "GROUP BY SAFE_CAST(`_MAP`['status'] AS STRING),"
199+
+ " `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h')) `t12`\n"
200+
+ "GROUP BY `@timestamp`) `t13`\n"
201+
+ "CROSS JOIN (SELECT CASE WHEN `t22`.`status` IS NOT NULL THEN `t16`.`status` ELSE"
202+
+ " CASE WHEN `t16`.`status` IS NULL THEN NULL ELSE 'OTHER' END END `$f0`\n"
203+
+ "FROM (SELECT `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h') `@timestamp`,"
204+
+ " SAFE_CAST(`_MAP`['status'] AS STRING) `status`, COUNT(*) `$f2_0`\n"
205+
+ "FROM `scott`.`test_dynamic`\n"
206+
+ "GROUP BY SAFE_CAST(`_MAP`['status'] AS STRING),"
207+
+ " `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h')) `t16`\n"
208+
+ "LEFT JOIN (SELECT `status`, SUM(`$f2_0`) `grand_total`\n"
209+
+ "FROM (SELECT `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h') `@timestamp`,"
210+
+ " SAFE_CAST(`_MAP`['status'] AS STRING) `status`, COUNT(*) `$f2_0`\n"
211+
+ "FROM `scott`.`test_dynamic`\n"
212+
+ "GROUP BY SAFE_CAST(`_MAP`['status'] AS STRING),"
213+
+ " `SPAN`(SAFE_CAST(`_MAP`['@timestamp'] AS STRING), 1, 'h')) `t19`\n"
214+
+ "WHERE `status` IS NOT NULL\n"
215+
+ "GROUP BY `status`\n"
216+
+ "ORDER BY 2 DESC NULLS FIRST\n"
217+
+ "LIMIT 10) `t22` ON `t16`.`status` IS NOT DISTINCT FROM `t22`.`status`\n"
218+
+ "GROUP BY CASE WHEN `t22`.`status` IS NOT NULL THEN `t16`.`status` ELSE CASE WHEN"
219+
+ " `t16`.`status` IS NULL THEN NULL ELSE 'OTHER' END END) `t24`) `t26`\n"
220+
+ "GROUP BY `@timestamp`, `status`\n"
221+
+ "ORDER BY `@timestamp` NULLS LAST, `status` NULLS LAST";
222+
verifyPPLToSparkSQL(root, expectedSparkSql);
223+
}
224+
225+
@Test
226+
public void testTrendlineWithDynamicField() {
227+
String ppl =
228+
"source=test_dynamic | trendline sma(2, latency) as latency_trend | fields latency,"
229+
+ " latency_trend";
230+
RelNode root = getRelNode(ppl);
231+
String expectedLogical =
232+
"LogicalProject(latency=[$3], latency_trend=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1),"
233+
+ " /(SUM(SAFE_CAST($3)) OVER (ROWS 1 PRECEDING), CAST(COUNT($3) OVER (ROWS 1"
234+
+ " PRECEDING)):DOUBLE NOT NULL), null:NULL)])\n"
235+
+ " LogicalFilter(condition=[IS NOT NULL($3)])\n"
236+
+ " LogicalProject(id=[$0], name=[$1], _MAP=[$2], latency=[ITEM($2, 'latency')])\n"
237+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
238+
verifyLogical(root, expectedLogical);
239+
240+
String expectedSparkSql =
241+
"SELECT `latency`, CASE WHEN (COUNT(*) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)) >"
242+
+ " 1 THEN (SUM(SAFE_CAST(`latency` AS INTEGER)) OVER (ROWS BETWEEN 1 PRECEDING AND"
243+
+ " CURRENT ROW)) / CAST(COUNT(`latency`) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT"
244+
+ " ROW) AS DOUBLE) ELSE NULL END `latency_trend`\n"
245+
+ "FROM (SELECT `id`, `name`, `_MAP`, `_MAP`['latency'] `latency`\n"
246+
+ "FROM `scott`.`test_dynamic`) `t`\n"
247+
+ "WHERE `latency` IS NOT NULL";
248+
verifyPPLToSparkSQL(root, expectedSparkSql);
249+
}
250+
251+
@Test
252+
public void testTrendlineSortByDynamicField() {
253+
String ppl =
254+
"source=test_dynamic | trendline sort status sma(2, latency) as latency_trend | fields"
255+
+ " latency, latency_trend";
256+
RelNode root = getRelNode(ppl);
257+
String expectedLogical =
258+
"LogicalProject(latency=[$4], latency_trend=[CASE(>(COUNT() OVER (ROWS 1 PRECEDING), 1),"
259+
+ " /(SUM(SAFE_CAST($4)) OVER (ROWS 1 PRECEDING), CAST(COUNT($4) OVER (ROWS 1"
260+
+ " PRECEDING)):DOUBLE NOT NULL), null:NULL)])\n"
261+
+ " LogicalFilter(condition=[IS NOT NULL($4)])\n"
262+
+ " LogicalProject(id=[$0], name=[$1], _MAP=[$2], status=[$3], latency=[ITEM($2,"
263+
+ " 'latency')])\n"
264+
+ " LogicalSort(sort0=[$3], dir0=[ASC])\n"
265+
+ " LogicalProject(id=[$0], name=[$1], _MAP=[$2],"
266+
+ " status=[SAFE_CAST(ITEM($2, 'status'))])\n"
267+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
268+
verifyLogical(root, expectedLogical);
269+
270+
String expectedSparkSql =
271+
"SELECT `_MAP`['latency'] `latency`, CASE WHEN (COUNT(*) OVER (ROWS BETWEEN 1 PRECEDING"
272+
+ " AND CURRENT ROW)) > 1 THEN (SUM(SAFE_CAST(`_MAP`['latency'] AS INTEGER)) OVER (ROWS"
273+
+ " BETWEEN 1 PRECEDING AND CURRENT ROW)) / CAST(COUNT(`_MAP`['latency']) OVER (ROWS"
274+
+ " BETWEEN 1 PRECEDING AND CURRENT ROW) AS DOUBLE) ELSE NULL END `latency_trend`\n"
275+
+ "FROM (SELECT `id`, `name`, `_MAP`, SAFE_CAST(`_MAP`['status'] AS STRING) `status`\n"
276+
+ "FROM `scott`.`test_dynamic`\n"
277+
+ "ORDER BY 4 NULLS LAST) `t0`\n"
278+
+ "WHERE `_MAP`['latency'] IS NOT NULL";
279+
verifyPPLToSparkSQL(root, expectedSparkSql);
280+
}
281+
282+
@Test
283+
public void testEventstatsWithDynamicField() {
284+
String ppl = "source=test_dynamic | eventstats count()";
285+
RelNode root = getRelNode(ppl);
286+
String expectedLogical =
287+
"LogicalProject(id=[$0], name=[$1], _MAP=[$2], count()=[COUNT() OVER ()])\n"
288+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
289+
verifyLogical(root, expectedLogical);
290+
291+
String expectedSparkSql =
292+
"SELECT `id`, `name`, `_MAP`, COUNT(*) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND"
293+
+ " UNBOUNDED FOLLOWING) `count()`\n"
294+
+ "FROM `scott`.`test_dynamic`";
295+
verifyPPLToSparkSQL(root, expectedSparkSql);
296+
}
297+
298+
@Test
299+
public void testEventstatsByDynamicField() {
300+
String ppl = "source=test_dynamic | eventstats max(id) by status";
301+
RelNode root = getRelNode(ppl);
302+
String expectedLogical =
303+
"LogicalProject(id=[$0], name=[$1], _MAP=[$2], max(id)=[MAX($0) OVER (PARTITION BY"
304+
+ " ITEM($2, 'status'))])\n"
305+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
306+
verifyLogical(root, expectedLogical);
307+
308+
String expectedSparkSql =
309+
"SELECT `id`, `name`, `_MAP`, MAX(`id`) OVER (PARTITION BY `_MAP`['status'] RANGE BETWEEN"
310+
+ " UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) `max(id)`\n"
311+
+ "FROM `scott`.`test_dynamic`";
312+
verifyPPLToSparkSQL(root, expectedSparkSql);
313+
}
314+
315+
@Test
316+
public void testEventstatsAvgDynamicField() {
317+
String ppl = "source=test_dynamic | eventstats avg(latency) by status";
318+
RelNode root = getRelNode(ppl);
319+
String expectedLogical =
320+
"LogicalProject(id=[$0], name=[$1], _MAP=[$2], avg(latency)=[/(SUM(SAFE_CAST(ITEM($2,"
321+
+ " 'latency'))) OVER (PARTITION BY ITEM($2, 'status')),"
322+
+ " CAST(COUNT(SAFE_CAST(ITEM($2, 'latency'))) OVER (PARTITION BY ITEM($2,"
323+
+ " 'status'))):DOUBLE NOT NULL)])\n"
324+
+ " LogicalTableScan(table=[[scott, test_dynamic]])\n";
325+
verifyLogical(root, expectedLogical);
326+
327+
String expectedSparkSql =
328+
"SELECT `id`, `name`, `_MAP`, (SUM(SAFE_CAST(`_MAP`['latency'] AS INTEGER)) OVER"
329+
+ " (PARTITION BY `_MAP`['status'] RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED"
330+
+ " FOLLOWING)) / CAST(COUNT(SAFE_CAST(`_MAP`['latency'] AS INTEGER)) OVER (PARTITION"
331+
+ " BY `_MAP`['status'] RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS"
332+
+ " DOUBLE) `avg(latency)`\n"
333+
+ "FROM `scott`.`test_dynamic`";
334+
verifyPPLToSparkSQL(root, expectedSparkSql);
335+
}
336+
}

0 commit comments

Comments
 (0)