Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ jobs:
org.apache.comet.CometStringExpressionSuite
org.apache.comet.CometBitwiseExpressionSuite
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometJsonExpressionSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
org.apache.comet.expressions.conditional.CometCaseWhenSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ jobs:
org.apache.comet.CometStringExpressionSuite
org.apache.comet.CometBitwiseExpressionSuite
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometJsonExpressionSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
org.apache.comet.expressions.conditional.CometCaseWhenSuite
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for `IsNaN` | true |
| `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for `IsNotNull` | true |
| `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true |
| `spark.comet.expression.JsonToStructs.enabled` | Enable Comet acceleration for `JsonToStructs` | true |
| `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet acceleration for `KnownFloatingPointNormalized` | true |
| `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true |
| `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true |
Expand Down
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion native/core/src/execution/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use datafusion::common::ScalarValue;
use datafusion::physical_expr::expressions::{LikeExpr, Literal};
use datafusion::physical_expr::PhysicalExpr;
use datafusion_comet_proto::spark_expression::Expr;
use datafusion_comet_spark_expr::{RLike, SubstringExpr};
use datafusion_comet_spark_expr::{FromJson, RLike, SubstringExpr};

use crate::execution::{
expressions::extract_expr,
operators::ExecutionError,
planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
serde::to_arrow_datatype,
};

/// Builder for Substring expressions
Expand Down Expand Up @@ -98,3 +99,27 @@ impl ExpressionBuilder for RlikeBuilder {
}
}
}

pub struct FromJsonBuilder;

impl ExpressionBuilder for FromJsonBuilder {
fn build(
&self,
spark_expr: &Expr,
input_schema: SchemaRef,
planner: &PhysicalPlanner,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let expr = extract_expr!(spark_expr, FromJson);
let child = planner.create_expr(
expr.child.as_ref().ok_or_else(|| {
ExecutionError::GeneralError("FromJson missing child".to_string())
})?,
input_schema,
)?;
let schema =
to_arrow_datatype(expr.schema.as_ref().ok_or_else(|| {
ExecutionError::GeneralError("FromJson missing schema".to_string())
})?);
Ok(Arc::new(FromJson::new(child, schema, &expr.timezone)))
}
}
4 changes: 4 additions & 0 deletions native/core/src/execution/planner/expression_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub enum ExpressionType {
CreateNamedStruct,
GetStructField,
ToJson,
FromJson,
ToPrettyString,
ListExtract,
GetArrayStructFields,
Expand Down Expand Up @@ -281,6 +282,8 @@ impl ExpressionRegistry {
.insert(ExpressionType::Like, Box::new(LikeBuilder));
self.builders
.insert(ExpressionType::Rlike, Box::new(RlikeBuilder));
self.builders
.insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
}

/// Extract expression type from Spark protobuf expression
Expand Down Expand Up @@ -336,6 +339,7 @@ impl ExpressionRegistry {
Some(ExprStruct::CreateNamedStruct(_)) => Ok(ExpressionType::CreateNamedStruct),
Some(ExprStruct::GetStructField(_)) => Ok(ExpressionType::GetStructField),
Some(ExprStruct::ToJson(_)) => Ok(ExpressionType::ToJson),
Some(ExprStruct::FromJson(_)) => Ok(ExpressionType::FromJson),
Some(ExprStruct::ToPrettyString(_)) => Ok(ExpressionType::ToPrettyString),
Some(ExprStruct::ListExtract(_)) => Ok(ExpressionType::ListExtract),
Some(ExprStruct::GetArrayStructFields(_)) => Ok(ExpressionType::GetArrayStructFields),
Expand Down
7 changes: 7 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ message Expr {
Rand randn = 62;
EmptyExpr spark_partition_id = 63;
EmptyExpr monotonically_increasing_id = 64;
FromJson from_json = 89;
}
}

Expand Down Expand Up @@ -268,6 +269,12 @@ message ToJson {
bool ignore_null_fields = 6;
}

message FromJson {
Expr child = 1;
DataType schema = 2;
string timezone = 3;
}

enum BinaryOutputStyle {
UTF8 = 0;
BASIC = 1;
Expand Down
1 change: 1 addition & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ datafusion = { workspace = true }
chrono-tz = { workspace = true }
num = { workspace = true }
regex = { workspace = true }
serde_json = "1.0"
thiserror = { workspace = true }
futures = { workspace = true }
twox-hash = "2.1.2"
Expand Down
Loading
Loading