@@ -31,6 +31,7 @@ import org.apache.spark.sql.types._
3131import org .apache .spark .sql .vectorized .ColumnVector
3232
3333import org .apache .comet .CometConf
34+ import org .apache .comet .CometConf .{SCAN_NATIVE_COMET , SCAN_NATIVE_DATAFUSION , SCAN_NATIVE_ICEBERG_COMPAT }
3435import org .apache .comet .parquet .BatchReader
3536
3637/**
@@ -60,7 +61,25 @@ object CometReadBenchmark extends CometBenchmarkBase {
6061 }
6162
6263 sqlBenchmark.addCase(" SQL Parquet - Comet" ) { _ =>
63- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
64+ withSQLConf(
65+ CometConf .COMET_ENABLED .key -> " true" ,
66+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
67+ spark.sql(s " select $query from parquetV1Table " ).noop()
68+ }
69+ }
70+
71+ sqlBenchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
72+ withSQLConf(
73+ CometConf .COMET_ENABLED .key -> " true" ,
74+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
75+ spark.sql(s " select $query from parquetV1Table " ).noop()
76+ }
77+ }
78+
79+ sqlBenchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
80+ withSQLConf(
81+ CometConf .COMET_ENABLED .key -> " true" ,
82+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
6483 spark.sql(s " select $query from parquetV1Table " ).noop()
6584 }
6685 }
@@ -89,7 +108,25 @@ object CometReadBenchmark extends CometBenchmarkBase {
89108 }
90109
91110 sqlBenchmark.addCase(" SQL Parquet - Comet" ) { _ =>
92- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
111+ withSQLConf(
112+ CometConf .COMET_ENABLED .key -> " true" ,
113+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
114+ spark.sql(" select sum(id) from parquetV1Table" ).noop()
115+ }
116+ }
117+
118+ sqlBenchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
119+ withSQLConf(
120+ CometConf .COMET_ENABLED .key -> " true" ,
121+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
122+ spark.sql(" select sum(id) from parquetV1Table" ).noop()
123+ }
124+ }
125+
126+ sqlBenchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
127+ withSQLConf(
128+ CometConf .COMET_ENABLED .key -> " true" ,
129+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
93130 spark.sql(" select sum(id) from parquetV1Table" ).noop()
94131 }
95132 }
@@ -197,7 +234,25 @@ object CometReadBenchmark extends CometBenchmarkBase {
197234 }
198235
199236 benchmark.addCase(" SQL Parquet - Comet" ) { _ =>
200- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
237+ withSQLConf(
238+ CometConf .COMET_ENABLED .key -> " true" ,
239+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
240+ spark.sql(" select sum(c2) from parquetV1Table where c1 + 1 > 0" ).noop()
241+ }
242+ }
243+
244+ benchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
245+ withSQLConf(
246+ CometConf .COMET_ENABLED .key -> " true" ,
247+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
248+ spark.sql(" select sum(c2) from parquetV1Table where c1 + 1 > 0" ).noop()
249+ }
250+ }
251+
252+ benchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
253+ withSQLConf(
254+ CometConf .COMET_ENABLED .key -> " true" ,
255+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
201256 spark.sql(" select sum(c2) from parquetV1Table where c1 + 1 > 0" ).noop()
202257 }
203258 }
@@ -216,26 +271,44 @@ object CometReadBenchmark extends CometBenchmarkBase {
216271 prepareTable(
217272 dir,
218273 spark.sql(s """
219- |WITH tmp
220- | AS (SELECT RAND() r FROM $tbl)
221- |SELECT
222- | CASE
223- | WHEN r < 0.2 THEN 'aaa'
224- | WHEN r < 0.4 THEN 'bbb'
225- | WHEN r < 0.6 THEN 'ccc'
226- | WHEN r < 0.8 THEN 'ddd'
227- | ELSE 'eee'
228- | END
229- |AS id
230- |FROM tmp
231- | """ .stripMargin))
274+ |WITH tmp
275+ | AS (SELECT RAND() r FROM $tbl)
276+ |SELECT
277+ | CASE
278+ | WHEN r < 0.2 THEN 'aaa'
279+ | WHEN r < 0.4 THEN 'bbb'
280+ | WHEN r < 0.6 THEN 'ccc'
281+ | WHEN r < 0.8 THEN 'ddd'
282+ | ELSE 'eee'
283+ | END
284+ |AS id
285+ |FROM tmp
286+ | """ .stripMargin))
232287
233288 sqlBenchmark.addCase(" SQL Parquet - Spark" ) { _ =>
234289 spark.sql(" select sum(length(id)) from parquetV1Table" ).noop()
235290 }
236291
237292 sqlBenchmark.addCase(" SQL Parquet - Comet" ) { _ =>
238- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
293+ withSQLConf(
294+ CometConf .COMET_ENABLED .key -> " true" ,
295+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
296+ spark.sql(" select sum(length(id)) from parquetV1Table" ).noop()
297+ }
298+ }
299+
300+ sqlBenchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
301+ withSQLConf(
302+ CometConf .COMET_ENABLED .key -> " true" ,
303+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
304+ spark.sql(" select sum(length(id)) from parquetV1Table" ).noop()
305+ }
306+ }
307+
308+ sqlBenchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
309+ withSQLConf(
310+ CometConf .COMET_ENABLED .key -> " true" ,
311+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
239312 spark.sql(" select sum(length(id)) from parquetV1Table" ).noop()
240313 }
241314 }
@@ -266,7 +339,31 @@ object CometReadBenchmark extends CometBenchmarkBase {
266339 }
267340
268341 benchmark.addCase(" SQL Parquet - Comet" ) { _ =>
269- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
342+ withSQLConf(
343+ CometConf .COMET_ENABLED .key -> " true" ,
344+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
345+ spark
346+ .sql(" select sum(length(c2)) from parquetV1Table where c1 is " +
347+ " not NULL and c2 is not NULL" )
348+ .noop()
349+ }
350+ }
351+
352+ benchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
353+ withSQLConf(
354+ CometConf .COMET_ENABLED .key -> " true" ,
355+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
356+ spark
357+ .sql(" select sum(length(c2)) from parquetV1Table where c1 is " +
358+ " not NULL and c2 is not NULL" )
359+ .noop()
360+ }
361+ }
362+
363+ benchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
364+ withSQLConf(
365+ CometConf .COMET_ENABLED .key -> " true" ,
366+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
270367 spark
271368 .sql(" select sum(length(c2)) from parquetV1Table where c1 is " +
272369 " not NULL and c2 is not NULL" )
@@ -296,7 +393,25 @@ object CometReadBenchmark extends CometBenchmarkBase {
296393 }
297394
298395 benchmark.addCase(" SQL Parquet - Comet" ) { _ =>
299- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
396+ withSQLConf(
397+ CometConf .COMET_ENABLED .key -> " true" ,
398+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
399+ spark.sql(s " SELECT sum(c $middle) FROM parquetV1Table " ).noop()
400+ }
401+ }
402+
403+ benchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
404+ withSQLConf(
405+ CometConf .COMET_ENABLED .key -> " true" ,
406+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
407+ spark.sql(s " SELECT sum(c $middle) FROM parquetV1Table " ).noop()
408+ }
409+ }
410+
411+ benchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
412+ withSQLConf(
413+ CometConf .COMET_ENABLED .key -> " true" ,
414+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
300415 spark.sql(s " SELECT sum(c $middle) FROM parquetV1Table " ).noop()
301416 }
302417 }
@@ -327,7 +442,25 @@ object CometReadBenchmark extends CometBenchmarkBase {
327442 }
328443
329444 benchmark.addCase(" SQL Parquet - Comet" ) { _ =>
330- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
445+ withSQLConf(
446+ CometConf .COMET_ENABLED .key -> " true" ,
447+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
448+ spark.sql(" SELECT * FROM parquetV1Table WHERE c1 + 1 > 0" ).noop()
449+ }
450+ }
451+
452+ benchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
453+ withSQLConf(
454+ CometConf .COMET_ENABLED .key -> " true" ,
455+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
456+ spark.sql(" SELECT * FROM parquetV1Table WHERE c1 + 1 > 0" ).noop()
457+ }
458+ }
459+
460+ benchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
461+ withSQLConf(
462+ CometConf .COMET_ENABLED .key -> " true" ,
463+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
331464 spark.sql(" SELECT * FROM parquetV1Table WHERE c1 + 1 > 0" ).noop()
332465 }
333466 }
@@ -358,7 +491,25 @@ object CometReadBenchmark extends CometBenchmarkBase {
358491 }
359492
360493 benchmark.addCase(" SQL Parquet - Comet" ) { _ =>
361- withSQLConf(CometConf .COMET_ENABLED .key -> " true" ) {
494+ withSQLConf(
495+ CometConf .COMET_ENABLED .key -> " true" ,
496+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_COMET ) {
497+ spark.sql(" SELECT * FROM parquetV1Table WHERE c1 + 1 > 0" ).noop()
498+ }
499+ }
500+
501+ benchmark.addCase(" SQL Parquet - Comet Native DataFusion" ) { _ =>
502+ withSQLConf(
503+ CometConf .COMET_ENABLED .key -> " true" ,
504+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_DATAFUSION ) {
505+ spark.sql(" SELECT * FROM parquetV1Table WHERE c1 + 1 > 0" ).noop()
506+ }
507+ }
508+
509+ benchmark.addCase(" SQL Parquet - Comet Native Iceberg Compat" ) { _ =>
510+ withSQLConf(
511+ CometConf .COMET_ENABLED .key -> " true" ,
512+ CometConf .COMET_NATIVE_SCAN_IMPL .key -> SCAN_NATIVE_ICEBERG_COMPAT ) {
362513 spark.sql(" SELECT * FROM parquetV1Table WHERE c1 + 1 > 0" ).noop()
363514 }
364515 }
0 commit comments