Skip to content

Commit fd53edb

Browse files
authored
feat: Add partial support for from_json (#2934)
1 parent 3dcd9ad commit fd53edb

File tree

15 files changed

+1098
-4
lines changed

15 files changed

+1098
-4
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ jobs:
161161
org.apache.comet.CometStringExpressionSuite
162162
org.apache.comet.CometBitwiseExpressionSuite
163163
org.apache.comet.CometMapExpressionSuite
164+
org.apache.comet.CometJsonExpressionSuite
164165
org.apache.comet.expressions.conditional.CometIfSuite
165166
org.apache.comet.expressions.conditional.CometCoalesceSuite
166167
org.apache.comet.expressions.conditional.CometCaseWhenSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ jobs:
126126
org.apache.comet.CometStringExpressionSuite
127127
org.apache.comet.CometBitwiseExpressionSuite
128128
org.apache.comet.CometMapExpressionSuite
129+
org.apache.comet.CometJsonExpressionSuite
129130
org.apache.comet.expressions.conditional.CometIfSuite
130131
org.apache.comet.expressions.conditional.CometCoalesceSuite
131132
org.apache.comet.expressions.conditional.CometCaseWhenSuite

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ These settings can be used to determine which parts of the plan are accelerated
264264
| `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for `IsNaN` | true |
265265
| `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for `IsNotNull` | true |
266266
| `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true |
267+
| `spark.comet.expression.JsonToStructs.enabled` | Enable Comet acceleration for `JsonToStructs` | true |
267268
| `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet acceleration for `KnownFloatingPointNormalized` | true |
268269
| `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true |
269270
| `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true |

native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/execution/expressions/strings.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ use datafusion::common::ScalarValue;
2525
use datafusion::physical_expr::expressions::{LikeExpr, Literal};
2626
use datafusion::physical_expr::PhysicalExpr;
2727
use datafusion_comet_proto::spark_expression::Expr;
28-
use datafusion_comet_spark_expr::{RLike, SubstringExpr};
28+
use datafusion_comet_spark_expr::{FromJson, RLike, SubstringExpr};
2929

3030
use crate::execution::{
3131
expressions::extract_expr,
3232
operators::ExecutionError,
3333
planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
34+
serde::to_arrow_datatype,
3435
};
3536

3637
/// Builder for Substring expressions
@@ -98,3 +99,27 @@ impl ExpressionBuilder for RlikeBuilder {
9899
}
99100
}
100101
}
102+
103+
pub struct FromJsonBuilder;
104+
105+
impl ExpressionBuilder for FromJsonBuilder {
106+
fn build(
107+
&self,
108+
spark_expr: &Expr,
109+
input_schema: SchemaRef,
110+
planner: &PhysicalPlanner,
111+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
112+
let expr = extract_expr!(spark_expr, FromJson);
113+
let child = planner.create_expr(
114+
expr.child.as_ref().ok_or_else(|| {
115+
ExecutionError::GeneralError("FromJson missing child".to_string())
116+
})?,
117+
input_schema,
118+
)?;
119+
let schema =
120+
to_arrow_datatype(expr.schema.as_ref().ok_or_else(|| {
121+
ExecutionError::GeneralError("FromJson missing schema".to_string())
122+
})?);
123+
Ok(Arc::new(FromJson::new(child, schema, &expr.timezone)))
124+
}
125+
}

native/core/src/execution/planner/expression_registry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub enum ExpressionType {
9494
CreateNamedStruct,
9595
GetStructField,
9696
ToJson,
97+
FromJson,
9798
ToPrettyString,
9899
ListExtract,
99100
GetArrayStructFields,
@@ -281,6 +282,8 @@ impl ExpressionRegistry {
281282
.insert(ExpressionType::Like, Box::new(LikeBuilder));
282283
self.builders
283284
.insert(ExpressionType::Rlike, Box::new(RlikeBuilder));
285+
self.builders
286+
.insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
284287
}
285288

286289
/// Extract expression type from Spark protobuf expression
@@ -336,6 +339,7 @@ impl ExpressionRegistry {
336339
Some(ExprStruct::CreateNamedStruct(_)) => Ok(ExpressionType::CreateNamedStruct),
337340
Some(ExprStruct::GetStructField(_)) => Ok(ExpressionType::GetStructField),
338341
Some(ExprStruct::ToJson(_)) => Ok(ExpressionType::ToJson),
342+
Some(ExprStruct::FromJson(_)) => Ok(ExpressionType::FromJson),
339343
Some(ExprStruct::ToPrettyString(_)) => Ok(ExpressionType::ToPrettyString),
340344
Some(ExprStruct::ListExtract(_)) => Ok(ExpressionType::ListExtract),
341345
Some(ExprStruct::GetArrayStructFields(_)) => Ok(ExpressionType::GetArrayStructFields),

native/proto/src/proto/expr.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ message Expr {
8585
Rand randn = 62;
8686
EmptyExpr spark_partition_id = 63;
8787
EmptyExpr monotonically_increasing_id = 64;
88+
FromJson from_json = 89;
8889
}
8990
}
9091

@@ -268,6 +269,12 @@ message ToJson {
268269
bool ignore_null_fields = 6;
269270
}
270271

272+
message FromJson {
273+
Expr child = 1;
274+
DataType schema = 2;
275+
string timezone = 3;
276+
}
277+
271278
enum BinaryOutputStyle {
272279
UTF8 = 0;
273280
BASIC = 1;

native/spark-expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ datafusion = { workspace = true }
3333
chrono-tz = { workspace = true }
3434
num = { workspace = true }
3535
regex = { workspace = true }
36+
serde_json = "1.0"
3637
thiserror = { workspace = true }
3738
futures = { workspace = true }
3839
twox-hash = "2.1.2"

0 commit comments

Comments
 (0)