Skip to content

Commit c5343f0

Browse files
committed
benchmark
1 parent d2090a1 commit c5343f0

File tree

3 files changed

+195
-3
lines changed

3 files changed

+195
-3
lines changed

native/core/src/execution/expressions/strings.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,17 @@ impl ExpressionBuilder for FromJsonBuilder {
111111
planner: &PhysicalPlanner,
112112
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
113113
let expr = extract_expr!(spark_expr, FromJson);
114-
let child = planner.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
115-
let schema = to_arrow_datatype(expr.schema.as_ref().unwrap());
114+
let child = planner.create_expr(
115+
expr.child
116+
.as_ref()
117+
.ok_or_else(|| ExecutionError::GeneralError("FromJson missing child".to_string()))?,
118+
input_schema,
119+
)?;
120+
let schema = to_arrow_datatype(
121+
expr.schema
122+
.as_ref()
123+
.ok_or_else(|| ExecutionError::GeneralError("FromJson missing schema".to_string()))?,
124+
);
116125
Ok(Arc::new(FromJson::new(child, schema, &expr.timezone)))
117126
}
118127
}

spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import org.scalactic.source.Position
2323
import org.scalatest.Tag
2424

2525
import org.apache.spark.sql.CometTestBase
26-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2726
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
27+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2828

2929
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
3030

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.benchmark
21+
22+
import org.apache.spark.benchmark.Benchmark
23+
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
24+
25+
import org.apache.comet.CometConf
26+
27+
/**
28+
* Configuration for a JSON expression benchmark.
29+
* @param name
30+
* Name for the benchmark
31+
* @param schema
32+
* Target schema for from_json
33+
* @param query
34+
* SQL query to benchmark
35+
* @param extraCometConfigs
36+
* Additional Comet configurations for the scan+exec case
37+
*/
38+
case class JsonExprConfig(
39+
name: String,
40+
schema: String,
41+
query: String,
42+
extraCometConfigs: Map[String, String] = Map.empty)
43+
44+
// spotless:off
45+
/**
46+
* Benchmark to measure performance of Comet JSON expressions. To run this benchmark:
47+
* `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometJsonExpressionBenchmark`
48+
* Results will be written to "spark/benchmarks/CometJsonExpressionBenchmark-**results.txt".
49+
*/
50+
// spotless:on
51+
object CometJsonExpressionBenchmark extends CometBenchmarkBase {
52+
53+
/**
54+
* Generic method to run a JSON expression benchmark with the given configuration.
55+
*/
56+
def runJsonExprBenchmark(config: JsonExprConfig, values: Int): Unit = {
57+
val benchmark = new Benchmark(config.name, values, output = output)
58+
59+
withTempPath { dir =>
60+
withTempTable("parquetV1Table") {
61+
// Generate data with specified JSON patterns
62+
val jsonData = config.name match {
63+
case "from_json - simple primitives" =>
64+
spark.sql(s"""
65+
SELECT
66+
concat('{"a":', CAST(value AS STRING), ',"b":"str_', CAST(value AS STRING), '"}') AS json_str
67+
FROM $tbl
68+
""")
69+
70+
case "from_json - all primitive types" =>
71+
spark.sql(s"""
72+
SELECT
73+
concat(
74+
'{"i32":', CAST(value % 1000 AS STRING),
75+
',"i64":', CAST(value * 1000000000L AS STRING),
76+
',"f32":', CAST(value * 1.5 AS STRING),
77+
',"f64":', CAST(value * 2.5 AS STRING),
78+
',"bool":', CASE WHEN value % 2 = 0 THEN 'true' ELSE 'false' END,
79+
',"str":"value_', CAST(value AS STRING), '"}'
80+
) AS json_str
81+
FROM $tbl
82+
""")
83+
84+
case "from_json - with nulls" =>
85+
spark.sql(s"""
86+
SELECT
87+
CASE
88+
WHEN value % 10 = 0 THEN NULL
89+
WHEN value % 5 = 0 THEN '{"a":null,"b":"test"}'
90+
WHEN value % 3 = 0 THEN '{"a":123}'
91+
ELSE concat('{"a":', CAST(value AS STRING), ',"b":"str_', CAST(value AS STRING), '"}')
92+
END AS json_str
93+
FROM $tbl
94+
""")
95+
96+
case "from_json - nested struct" =>
97+
spark.sql(s"""
98+
SELECT
99+
concat(
100+
'{"outer":{"inner_a":', CAST(value AS STRING),
101+
',"inner_b":"nested_', CAST(value AS STRING), '"}}') AS json_str
102+
FROM $tbl
103+
""")
104+
105+
case "from_json - field access" =>
106+
spark.sql(s"""
107+
SELECT
108+
concat('{"a":', CAST(value AS STRING), ',"b":"str_', CAST(value AS STRING), '"}') AS json_str
109+
FROM $tbl
110+
""")
111+
112+
case _ =>
113+
spark.sql(s"""
114+
SELECT
115+
concat('{"a":', CAST(value AS STRING), ',"b":"str_', CAST(value AS STRING), '"}') AS json_str
116+
FROM $tbl
117+
""")
118+
}
119+
120+
prepareTable(dir, jsonData)
121+
122+
benchmark.addCase("SQL Parquet - Spark") { _ =>
123+
spark.sql(config.query).noop()
124+
}
125+
126+
benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
127+
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
128+
spark.sql(config.query).noop()
129+
}
130+
}
131+
132+
benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
133+
val baseConfigs =
134+
Map(
135+
CometConf.COMET_ENABLED.key -> "true",
136+
CometConf.COMET_EXEC_ENABLED.key -> "true",
137+
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
138+
"spark.sql.optimizer.constantFolding.enabled" -> "false")
139+
val allConfigs = baseConfigs ++ config.extraCometConfigs
140+
141+
withSQLConf(allConfigs.toSeq: _*) {
142+
spark.sql(config.query).noop()
143+
}
144+
}
145+
146+
benchmark.run()
147+
}
148+
}
149+
}
150+
151+
// Configuration for all JSON expression benchmarks
152+
private val jsonExpressions = List(
153+
JsonExprConfig(
154+
"from_json - simple primitives",
155+
"a INT, b STRING",
156+
"SELECT from_json(json_str, 'a INT, b STRING') FROM parquetV1Table"),
157+
JsonExprConfig(
158+
"from_json - all primitive types",
159+
"i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool BOOLEAN, str STRING",
160+
"SELECT from_json(json_str, 'i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool BOOLEAN, str STRING') FROM parquetV1Table"),
161+
JsonExprConfig(
162+
"from_json - with nulls",
163+
"a INT, b STRING",
164+
"SELECT from_json(json_str, 'a INT, b STRING') FROM parquetV1Table"),
165+
JsonExprConfig(
166+
"from_json - nested struct",
167+
"outer STRUCT<inner_a: INT, inner_b: STRING>",
168+
"SELECT from_json(json_str, 'outer STRUCT<inner_a: INT, inner_b: STRING>') FROM parquetV1Table"),
169+
JsonExprConfig(
170+
"from_json - field access",
171+
"a INT, b STRING",
172+
"SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"))
173+
174+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
175+
val values = 1024 * 1024
176+
177+
jsonExpressions.foreach { config =>
178+
runBenchmarkWithTable(config.name, values) { v =>
179+
runJsonExprBenchmark(config, v)
180+
}
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)