@@ -115,337 +115,3 @@ where o_orderstatus in (
115
115
116
116
Ok ( ( ) )
117
117
}
118
-
119
- #[ tokio:: test]
120
- async fn tpch_q2_correlated ( ) -> Result < ( ) > {
121
- let ctx = SessionContext :: new ( ) ;
122
- register_tpch_csv ( & ctx, "part" ) . await ?;
123
- register_tpch_csv ( & ctx, "supplier" ) . await ?;
124
- register_tpch_csv ( & ctx, "partsupp" ) . await ?;
125
- register_tpch_csv ( & ctx, "nation" ) . await ?;
126
- register_tpch_csv ( & ctx, "region" ) . await ?;
127
-
128
- let sql = r#"select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
129
- from part, supplier, partsupp, nation, region
130
- where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 15 and p_type like '%BRASS'
131
- and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'EUROPE'
132
- and ps_supplycost = (
133
- select min(ps_supplycost) from partsupp, supplier, nation, region
134
- where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey
135
- and n_regionkey = r_regionkey and r_name = 'EUROPE'
136
- )
137
- order by s_acctbal desc, n_name, s_name, p_partkey;"# ;
138
-
139
- // assert plan
140
- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
141
- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
142
- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
143
- let expected = r#"Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST
144
- Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment
145
- Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name
146
- Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.__value
147
- Inner Join: nation.n_regionkey = region.r_regionkey
148
- Inner Join: supplier.s_nationkey = nation.n_nationkey
149
- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
150
- Inner Join: part.p_partkey = partsupp.ps_partkey
151
- Filter: part.p_size = Int32(15) AND part.p_type LIKE Utf8("%BRASS")
152
- TableScan: part projection=[p_partkey, p_mfgr, p_type, p_size], partial_filters=[part.p_size = Int32(15), part.p_type LIKE Utf8("%BRASS")]
153
- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
154
- TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
155
- TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
156
- Filter: region.r_name = Utf8("EUROPE")
157
- TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]
158
- SubqueryAlias: __scalar_sq_1
159
- Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
160
- Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
161
- Inner Join: nation.n_regionkey = region.r_regionkey
162
- Inner Join: supplier.s_nationkey = nation.n_nationkey
163
- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
164
- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
165
- TableScan: supplier projection=[s_suppkey, s_nationkey]
166
- TableScan: nation projection=[n_nationkey, n_regionkey]
167
- Filter: region.r_name = Utf8("EUROPE")
168
- TableScan: region projection=[r_regionkey, r_name], partial_filters=[region.r_name = Utf8("EUROPE")]"# ;
169
- assert_eq ! ( actual, expected) ;
170
-
171
- // assert data
172
- let results = execute_to_batches ( & ctx, sql) . await ;
173
- let expected = vec ! [ "++" , "++" ] ;
174
- assert_batches_eq ! ( expected, & results) ;
175
-
176
- Ok ( ( ) )
177
- }
178
-
179
- #[ tokio:: test]
180
- async fn tpch_q4_correlated ( ) -> Result < ( ) > {
181
- let orders = r#"4,13678,O,53829.87,1995-10-11,5-LOW,Clerk#000000124,0,
182
- 35,12760,O,192885.43,1995-10-23,4-NOT SPECIFIED,Clerk#000000259,0,
183
- 65,1627,P,99763.79,1995-03-18,1-URGENT,Clerk#000000632,0,
184
- "# ;
185
- let lineitems = r#"4,8804,579,1,30,51384,0.03,0.08,N,O,1996-01-10,1995-12-14,1996-01-18,DELIVER IN PERSON,REG AIR,
186
- 35,45,296,1,24,22680.96,0.02,0,N,O,1996-02-21,1996-01-03,1996-03-18,TAKE BACK RETURN,FOB,
187
- 65,5970,481,1,26,48775.22,0.03,0.03,A,F,1995-04-20,1995-04-25,1995-05-13,NONE,TRUCK,
188
- "# ;
189
-
190
- let ctx = SessionContext :: new ( ) ;
191
- register_tpch_csv_data ( & ctx, "orders" , orders) . await ?;
192
- register_tpch_csv_data ( & ctx, "lineitem" , lineitems) . await ?;
193
-
194
- let sql = r#"
195
- select o_orderpriority, count(*) as order_count
196
- from orders
197
- where exists (
198
- select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate)
199
- group by o_orderpriority
200
- order by o_orderpriority;
201
- "# ;
202
-
203
- // assert plan
204
- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
205
- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
206
- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
207
- let expected = r#"Sort: orders.o_orderpriority ASC NULLS LAST
208
- Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count
209
- Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
210
- LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey
211
- TableScan: orders projection=[o_orderkey, o_orderpriority]
212
- Filter: lineitem.l_commitdate < lineitem.l_receiptdate
213
- TableScan: lineitem projection=[l_orderkey, l_commitdate, l_receiptdate]"#
214
- . to_string ( ) ;
215
- assert_eq ! ( actual, expected) ;
216
-
217
- // assert data
218
- let results = execute_to_batches ( & ctx, sql) . await ;
219
- let expected = vec ! [
220
- "+-----------------+-------------+" ,
221
- "| o_orderpriority | order_count |" ,
222
- "+-----------------+-------------+" ,
223
- "| 1-URGENT | 1 |" ,
224
- "| 4-NOT SPECIFIED | 1 |" ,
225
- "| 5-LOW | 1 |" ,
226
- "+-----------------+-------------+" ,
227
- ] ;
228
- assert_batches_eq ! ( expected, & results) ;
229
-
230
- Ok ( ( ) )
231
- }
232
-
233
- #[ tokio:: test]
234
- async fn tpch_q17_correlated ( ) -> Result < ( ) > {
235
- let parts = r#"63700,goldenrod lavender spring chocolate lace,Manufacturer#1,Brand#23,PROMO BURNISHED COPPER,7,MED BOX,901.00,ly. slyly ironi
236
- "# ;
237
- let lineitems = r#"1,63700,7311,2,36.0,45983.16,0.09,0.06,N,O,1996-04-12,1996-02-28,1996-04-20,TAKE BACK RETURN,MAIL,ly final dependencies: slyly bold
238
- 1,63700,3701,3,1.0,13309.6,0.1,0.02,N,O,1996-01-29,1996-03-05,1996-01-31,TAKE BACK RETURN,REG AIR,"riously. regular, express dep"
239
- "# ;
240
-
241
- let ctx = SessionContext :: new ( ) ;
242
- register_tpch_csv_data ( & ctx, "part" , parts) . await ?;
243
- register_tpch_csv_data ( & ctx, "lineitem" , lineitems) . await ?;
244
-
245
- let sql = r#"select sum(l_extendedprice) / 7.0 as avg_yearly
246
- from lineitem, part
247
- where p_partkey = l_partkey and p_brand = 'Brand#23' and p_container = 'MED BOX'
248
- and l_quantity < (
249
- select 0.2 * avg(l_quantity)
250
- from lineitem where l_partkey = p_partkey
251
- );"# ;
252
-
253
- // assert plan
254
- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
255
- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
256
- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
257
- let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly
258
- Aggregate: groupBy=[[]], aggr=[[SUM(lineitem.l_extendedprice)]]
259
- Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
260
- Inner Join: part.p_partkey = __scalar_sq_1.l_partkey, lineitem.l_partkey = __scalar_sq_1.l_partkey
261
- Inner Join: lineitem.l_partkey = part.p_partkey
262
- TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice]
263
- Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX")
264
- TableScan: part projection=[p_partkey, p_brand, p_container]
265
- SubqueryAlias: __scalar_sq_1
266
- Projection: lineitem.l_partkey, Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
267
- Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]]
268
- TableScan: lineitem projection=[l_partkey, l_quantity]"#
269
- . to_string ( ) ;
270
- assert_eq ! ( actual, expected) ;
271
-
272
- // assert data
273
- let results = execute_to_batches ( & ctx, sql) . await ;
274
- let expected = vec ! [
275
- "+--------------------+" ,
276
- "| avg_yearly |" ,
277
- "+--------------------+" ,
278
- "| 190.13714285714286 |" ,
279
- "+--------------------+" ,
280
- ] ;
281
- assert_batches_eq ! ( expected, & results) ;
282
-
283
- Ok ( ( ) )
284
- }
285
-
286
- #[ tokio:: test]
287
- async fn tpch_q20_correlated ( ) -> Result < ( ) > {
288
- let ctx = SessionContext :: new ( ) ;
289
- register_tpch_csv ( & ctx, "supplier" ) . await ?;
290
- register_tpch_csv ( & ctx, "nation" ) . await ?;
291
- register_tpch_csv ( & ctx, "partsupp" ) . await ?;
292
- register_tpch_csv ( & ctx, "part" ) . await ?;
293
- register_tpch_csv ( & ctx, "lineitem" ) . await ?;
294
-
295
- let sql = r#"select s_name, s_address
296
- from supplier, nation
297
- where s_suppkey in (
298
- select ps_suppkey from partsupp
299
- where ps_partkey in ( select p_partkey from part where p_name like 'forest%' )
300
- and ps_availqty > ( select 0.5 * sum(l_quantity) from lineitem
301
- where l_partkey = ps_partkey and l_suppkey = ps_suppkey and l_shipdate >= date '1994-01-01'
302
- )
303
- )
304
- and s_nationkey = n_nationkey and n_name = 'CANADA'
305
- order by s_name;
306
- "# ;
307
-
308
- // assert plan
309
- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
310
- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
311
- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
312
- let expected = r#"Sort: supplier.s_name ASC NULLS LAST
313
- Projection: supplier.s_name, supplier.s_address
314
- LeftSemi Join: supplier.s_suppkey = __correlated_sq_1.ps_suppkey
315
- Inner Join: supplier.s_nationkey = nation.n_nationkey
316
- TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
317
- Filter: nation.n_name = Utf8("CANADA")
318
- TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("CANADA")]
319
- SubqueryAlias: __correlated_sq_1
320
- Projection: partsupp.ps_suppkey AS ps_suppkey
321
- Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_1.__value
322
- Inner Join: partsupp.ps_partkey = __scalar_sq_1.l_partkey, partsupp.ps_suppkey = __scalar_sq_1.l_suppkey
323
- LeftSemi Join: partsupp.ps_partkey = __correlated_sq_2.p_partkey
324
- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
325
- SubqueryAlias: __correlated_sq_2
326
- Projection: part.p_partkey AS p_partkey
327
- Filter: part.p_name LIKE Utf8("forest%")
328
- TableScan: part projection=[p_partkey, p_name], partial_filters=[part.p_name LIKE Utf8("forest%")]
329
- SubqueryAlias: __scalar_sq_1
330
- Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
331
- Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
332
- Filter: lineitem.l_shipdate >= Date32("8766")
333
- TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate], partial_filters=[lineitem.l_shipdate >= Date32("8766")]"# ;
334
- assert_eq ! ( actual, expected) ;
335
-
336
- // assert data
337
- let results = execute_to_batches ( & ctx, sql) . await ;
338
- let expected = vec ! [ "++" , "++" ] ;
339
- assert_batches_eq ! ( expected, & results) ;
340
-
341
- Ok ( ( ) )
342
- }
343
-
344
- #[ tokio:: test]
345
- async fn tpch_q22_correlated ( ) -> Result < ( ) > {
346
- let ctx = SessionContext :: new ( ) ;
347
- register_tpch_csv ( & ctx, "customer" ) . await ?;
348
- register_tpch_csv ( & ctx, "orders" ) . await ?;
349
-
350
- let sql = r#"select cntrycode, count(*) as numcust, sum(c_acctbal) as totacctbal
351
- from (
352
- select substring(c_phone from 1 for 2) as cntrycode, c_acctbal from customer
353
- where substring(c_phone from 1 for 2) in ('13', '31', '23', '29', '30', '18', '17')
354
- and c_acctbal > (
355
- select avg(c_acctbal) from customer where c_acctbal > 0.00
356
- and substring(c_phone from 1 for 2) in ('13', '31', '23', '29', '30', '18', '17')
357
- )
358
- and not exists ( select * from orders where o_custkey = c_custkey )
359
- ) as custsale
360
- group by cntrycode
361
- order by cntrycode;"# ;
362
-
363
- // assert plan
364
- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
365
- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
366
- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
367
- let expected = r#"Sort: custsale.cntrycode ASC NULLS LAST
368
- Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal
369
- Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
370
- SubqueryAlias: custsale
371
- Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
372
- Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_1.__value
373
- CrossJoin:
374
- LeftAnti Join: customer.c_custkey = orders.o_custkey
375
- Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
376
- TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])]
377
- TableScan: orders projection=[o_custkey]
378
- SubqueryAlias: __scalar_sq_1
379
- Projection: AVG(customer.c_acctbal) AS __value
380
- Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
381
- Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
382
- TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"# ;
383
- assert_eq ! ( expected, actual) ;
384
-
385
- // assert data
386
- let results = execute_to_batches ( & ctx, sql) . await ;
387
- let expected = vec ! [
388
- "+-----------+---------+------------+" ,
389
- "| cntrycode | numcust | totacctbal |" ,
390
- "+-----------+---------+------------+" ,
391
- "| 18 | 1 | 8324.07 |" ,
392
- "| 30 | 1 | 7638.57 |" ,
393
- "+-----------+---------+------------+" ,
394
- ] ;
395
- assert_batches_eq ! ( expected, & results) ;
396
-
397
- Ok ( ( ) )
398
- }
399
-
400
- #[ tokio:: test]
401
- async fn tpch_q11_correlated ( ) -> Result < ( ) > {
402
- let ctx = SessionContext :: new ( ) ;
403
- register_tpch_csv ( & ctx, "partsupp" ) . await ?;
404
- register_tpch_csv ( & ctx, "supplier" ) . await ?;
405
- register_tpch_csv ( & ctx, "nation" ) . await ?;
406
-
407
- let sql = r#"select ps_partkey, sum(ps_supplycost * ps_availqty) as value
408
- from partsupp, supplier, nation
409
- where ps_suppkey = s_suppkey and s_nationkey = n_nationkey and n_name = 'GERMANY'
410
- group by ps_partkey having
411
- sum(ps_supplycost * ps_availqty) > (
412
- select sum(ps_supplycost * ps_availqty) * 0.0001
413
- from partsupp, supplier, nation
414
- where ps_suppkey = s_suppkey and s_nationkey = n_nationkey and n_name = 'GERMANY'
415
- )
416
- order by value desc;
417
- "# ;
418
-
419
- // assert plan
420
- let dataframe = ctx. sql ( sql) . await . unwrap ( ) ;
421
- let plan = dataframe. into_optimized_plan ( ) . unwrap ( ) ;
422
- let actual = format ! ( "{}" , plan. display_indent( ) ) ;
423
- let expected = r#"Sort: value DESC NULLS FIRST
424
- Projection: partsupp.ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS value
425
- Filter: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Decimal128(38, 15)) > CAST(__scalar_sq_1.__value AS Decimal128(38, 15))
426
- CrossJoin:
427
- Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
428
- Inner Join: supplier.s_nationkey = nation.n_nationkey
429
- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
430
- TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
431
- TableScan: supplier projection=[s_suppkey, s_nationkey]
432
- Filter: nation.n_name = Utf8("GERMANY")
433
- TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]
434
- SubqueryAlias: __scalar_sq_1
435
- Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
436
- Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
437
- Inner Join: supplier.s_nationkey = nation.n_nationkey
438
- Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
439
- TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
440
- TableScan: supplier projection=[s_suppkey, s_nationkey]
441
- Filter: nation.n_name = Utf8("GERMANY")
442
- TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8("GERMANY")]"# ;
443
- assert_eq ! ( actual, expected) ;
444
-
445
- // assert data
446
- let results = execute_to_batches ( & ctx, sql) . await ;
447
- let expected = vec ! [ "++" , "++" ] ;
448
- assert_batches_eq ! ( expected, & results) ;
449
-
450
- Ok ( ( ) )
451
- }
0 commit comments