Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0ea8d12
Introduce BinaryOutputStyle from Spark 4.0
hsiang-c Sep 11, 2025
5366d66
Allow casting from binary to string
hsiang-c Sep 11, 2025
89f70bb
Pass binaryOutputStyle to query plan serde
hsiang-c Sep 11, 2025
018e6b7
Take binaryOutputStyle in planner
hsiang-c Sep 11, 2025
ec8e333
Implement Spark-style ToPrettyString
hsiang-c Sep 11, 2025
e0edca2
Match file name w/ test name
hsiang-c Sep 11, 2025
18ae1ad
Test all 5 BinaryOutputStyle in Spark 4.0
hsiang-c Sep 11, 2025
cac6e7e
Fix package: 'org.apache.sql' -> 'org.apache.spark.sql'
hsiang-c Sep 11, 2025
d1ff945
Add CometToPrettyStringSuite back to CI
hsiang-c Sep 11, 2025
d6edd8e
Specify binaryOutputStyle for Spark 3.4
hsiang-c Sep 12, 2025
39031fb
Let Comet deal with non pretty string casting
hsiang-c Sep 12, 2025
374d113
Enable binary to string casting test
hsiang-c Sep 12, 2025
0e7e4ec
Merge branch 'main' into pretty_string
hsiang-c Sep 15, 2025
47deeb0
Attempt to fix the build; ToPrettyString is Spark 3.5+
hsiang-c Sep 15, 2025
199133d
Removed resolved issues
hsiang-c Sep 17, 2025
7663644
Type casting only function
hsiang-c Sep 17, 2025
ae1ce38
Extract test setup logic to CometFuzzTestBase
hsiang-c Sep 17, 2025
4697399
Merge branch 'main' into pretty_string
hsiang-c Sep 17, 2025
308af77
Move binary_output_style proto <-> enum mapping to core
hsiang-c Sep 17, 2025
ce0d8f0
Move BinaryOutputStyle from cast.rs to lib.rs
hsiang-c Sep 17, 2025
2c7d465
Remove incorrect comments
hsiang-c Sep 17, 2025
66b331a
Merge branch 'main' into pretty_string
hsiang-c Sep 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ jobs:
org.apache.comet.CometBitwiseExpressionSuite
org.apache.comet.CometMapExpressionSuite
org.apache.comet.objectstore.NativeConfigSuite
- name: "sql"
value: |
${{ matrix.profile.maven_opts != 'Spark 3.4, JDK 11, Scala 2.12' && 'org.apache.spark.sql.CometToPrettyStringSuite' || ''}}
fail-fast: false
name: ${{ matrix.os }}/${{ matrix.profile.name }} [${{ matrix.suite.name }}]
runs-on: ${{ matrix.os }}
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ jobs:
org.apache.comet.CometBitwiseExpressionSuite
org.apache.comet.CometMapExpressionSuite
org.apache.comet.objectstore.NativeConfigSuite
- name: "sql"
value: |
${{ matrix.profile.maven_opts != 'Spark 3.4, JDK 11, Scala 2.12' && 'org.apache.spark.sql.CometToPrettyStringSuite' || ''}}
fail-fast: false
name: ${{ matrix.os }}/${{ matrix.profile.name }} [${{ matrix.suite.name }}]
runs-on: ${{ matrix.os }}
Expand Down
5 changes: 1 addition & 4 deletions dev/ci/check-suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ def file_to_class_name(path: Path) -> str | None:
"org.apache.comet.parquet.ParquetReadFromS3Suite", # manual test suite
"org.apache.spark.sql.comet.CometPlanStabilitySuite", # abstract
"org.apache.spark.sql.comet.ParquetDatetimeRebaseSuite", # abstract
"org.apache.comet.exec.CometColumnarShuffleSuite", # abstract
# TODO add CometToPrettyStringSuite to PR worklows
# https://github.com/apache/datafusion-comet/issues/2307
"org.apache.spark.sql.CometToPrettyStringSuite"
"org.apache.comet.exec.CometColumnarShuffleSuite" # abstract
]

for workflow_filename in [".github/workflows/pr_build_linux.yml", ".github/workflows/pr_build_macos.yml"]:
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ The following cast operations are generally compatible with Spark except for the
| string | long | |
| string | binary | |
| string | date | Only supports years between 262143 BC and 262142 AD |
| binary | string | |
| date | string | |
| timestamp | long | |
| timestamp | string | |
Expand All @@ -233,7 +234,6 @@ The following cast operations are not compatible with Spark for all inputs and a
| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. |
| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits |
| string | timestamp | Not all valid formats are supported |
| binary | string | Only works for binary data representing valid UTF-8 strings |
<!--END:INCOMPAT_CAST_TABLE-->

### Unsupported Casts
Expand Down
2 changes: 2 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_modulo_expr, create_negate_expr, BloomFilterAgg,
BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
create_comet_physical_fun, create_modulo_expr, create_negate_expr, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
};

use crate::execution::operators::ExecutionError::GeneralError;
Expand Down Expand Up @@ -809,6 +809,8 @@ impl PhysicalPlanner {
SparkCastOptions::new(EvalMode::Try, &expr.timezone, true);
let null_string = "NULL";
spark_cast_options.null_string = null_string.to_string();
spark_cast_options.binary_output_style =
from_protobuf_binary_output_style(expr.binary_output_style).ok();
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
let cast = Arc::new(Cast::new(
Arc::clone(&child),
Expand Down Expand Up @@ -2693,6 +2695,18 @@ fn create_case_expr(
}
}

fn from_protobuf_binary_output_style(
value: i32,
) -> Result<BinaryOutputStyle, prost::UnknownEnumValue> {
match spark_expression::BinaryOutputStyle::try_from(value)? {
spark_expression::BinaryOutputStyle::Utf8 => Ok(BinaryOutputStyle::Utf8),
spark_expression::BinaryOutputStyle::Basic => Ok(BinaryOutputStyle::Basic),
spark_expression::BinaryOutputStyle::Base64 => Ok(BinaryOutputStyle::Base64),
spark_expression::BinaryOutputStyle::Hex => Ok(BinaryOutputStyle::Hex),
spark_expression::BinaryOutputStyle::HexDiscrete => Ok(BinaryOutputStyle::HexDiscrete),
}
}

fn literal_to_array_ref(
data_type: DataType,
list_literal: ListLiteral,
Expand Down
9 changes: 9 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,18 @@ message ToJson {
bool ignore_null_fields = 6;
}

enum BinaryOutputStyle {
UTF8 = 0;
BASIC = 1;
BASE64 = 2;
HEX = 3;
HEX_DISCRETE = 4;
}

message ToPrettyString {
Expr child = 1;
string timezone = 2;
BinaryOutputStyle binaryOutputStyle = 3;
}

message Hour {
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ thiserror = { workspace = true }
futures = { workspace = true }
twox-hash = "2.1.2"
rand = { workspace = true }
hex = "0.4.3"
base64 = "0.22.1"

[dev-dependencies]
arrow = {workspace = true}
Expand Down
89 changes: 81 additions & 8 deletions native/spark-expr/src/conversion_funcs/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use crate::timezone;
use crate::utils::array_with_timezone;
use crate::{timezone, BinaryOutputStyle};
use crate::{EvalMode, SparkError, SparkResult};
use arrow::array::builder::StringBuilder;
use arrow::array::{DictionaryArray, StringArray, StructArray};
use arrow::array::{DictionaryArray, GenericByteArray, StringArray, StructArray};
use arrow::compute::can_cast_types;
use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType, Schema};
use arrow::datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, DataType, GenericBinaryType, Schema,
};
use arrow::{
array::{
cast::AsArray,
Expand Down Expand Up @@ -60,6 +62,8 @@ use std::{
sync::Arc,
};

use base64::prelude::*;

static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");

const MICROS_PER_SECOND: i64 = 1000000;
Expand Down Expand Up @@ -260,11 +264,7 @@ fn can_cast_to_string(from_type: &DataType, options: &SparkCastOptions) -> bool
// scientific notation where Comet does not
true
}
Binary => {
// https://github.com/apache/datafusion-comet/issues/377
// Only works for binary data representing valid UTF-8 strings
options.allow_incompat
}
Binary => true,
Struct(fields) => fields
.iter()
.all(|f| can_cast_to_string(f.data_type(), options)),
Expand Down Expand Up @@ -816,6 +816,8 @@ pub struct SparkCastOptions {
pub is_adapting_schema: bool,
/// String to use to represent null values
pub null_string: String,
/// SparkSQL's binaryOutputStyle
pub binary_output_style: Option<BinaryOutputStyle>,
}

impl SparkCastOptions {
Expand All @@ -827,6 +829,7 @@ impl SparkCastOptions {
allow_cast_unsigned_ints: false,
is_adapting_schema: false,
null_string: "null".to_string(),
binary_output_style: None,
}
}

Expand All @@ -838,6 +841,7 @@ impl SparkCastOptions {
allow_cast_unsigned_ints: false,
is_adapting_schema: false,
null_string: "null".to_string(),
binary_output_style: None,
}
}
}
Expand Down Expand Up @@ -1027,6 +1031,7 @@ fn cast_array(
{
Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?)
}
(Binary, Utf8) => Ok(cast_binary_to_string::<i32>(&array, cast_options)?),
_ if cast_options.is_adapting_schema
|| is_datafusion_spark_compatible(from_type, to_type, cast_options.allow_incompat) =>
{
Expand All @@ -1045,6 +1050,74 @@ fn cast_array(
Ok(spark_cast_postprocess(cast_result?, from_type, to_type))
}

fn cast_binary_to_string<O: OffsetSizeTrait>(
array: &dyn Array,
spark_cast_options: &SparkCastOptions,
) -> Result<ArrayRef, ArrowError> {
let input = array
.as_any()
.downcast_ref::<GenericByteArray<GenericBinaryType<O>>>()
.unwrap();

fn binary_formatter(value: &[u8], spark_cast_options: &SparkCastOptions) -> String {
match spark_cast_options.binary_output_style {
Some(s) => spark_binary_formatter(value, s),
None => cast_binary_formatter(value),
}
}

let output_array = input
.iter()
.map(|value| match value {
Some(value) => Ok(Some(binary_formatter(value, spark_cast_options))),
_ => Ok(None),
})
.collect::<Result<GenericStringArray<O>, ArrowError>>()?;
Ok(Arc::new(output_array))
}

/// This function mimics the [BinaryFormatter]: https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala#L449-L468
/// used by SparkSQL's ToPrettyString expression.
/// The BinaryFormatter was [introduced]: https://issues.apache.org/jira/browse/SPARK-47911 in Spark 4.0.0
/// Before Spark 4.0.0, the default is SPACE_DELIMITED_UPPERCASE_HEX
fn spark_binary_formatter(value: &[u8], binary_output_style: BinaryOutputStyle) -> String {
match binary_output_style {
BinaryOutputStyle::Utf8 => String::from_utf8(value.to_vec()).unwrap(),
BinaryOutputStyle::Basic => {
format!(
"{:?}",
value
.iter()
.map(|v| i8::from_ne_bytes([*v]))
.collect::<Vec<i8>>()
)
}
BinaryOutputStyle::Base64 => BASE64_STANDARD_NO_PAD.encode(value),
BinaryOutputStyle::Hex => value
.iter()
.map(|v| hex::encode_upper([*v]))
.collect::<String>(),
BinaryOutputStyle::HexDiscrete => {
// Spark's default SPACE_DELIMITED_UPPERCASE_HEX
format!(
"[{}]",
value
.iter()
.map(|v| hex::encode_upper([*v]))
.collect::<Vec<String>>()
.join(" ")
)
}
}
}

fn cast_binary_formatter(value: &[u8]) -> String {
match String::from_utf8(value.to_vec()) {
Ok(value) => value,
Err(_) => unsafe { String::from_utf8_unchecked(value.to_vec()) },
}
}

/// Determines if DataFusion supports the given cast in a way that is
/// compatible with Spark
fn is_datafusion_spark_compatible(
Expand Down
9 changes: 9 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ pub enum EvalMode {
Try,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum BinaryOutputStyle {
Utf8,
Basic,
Base64,
Hex,
HexDiscrete,
}

pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError {
SparkError::ArithmeticOverflow {
from_type: from_type.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
"There can be formatting differences in some case due to Spark using " +
"scientific notation where Comet does not"))
case DataTypes.BinaryType =>
// https://github.com/apache/datafusion-comet/issues/377
Incompatible(Some("Only works for binary data representing valid UTF-8 strings"))
Compatible()
case StructType(fields) =>
for (field <- fields) {
isSupported(field.dataType, DataTypes.StringType, timeZoneId, evalMode) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
.newBuilder()
.setChild(p)
.setTimezone(timezoneId.getOrElse("UTC"))
.setBinaryOutputStyle(binaryOutputStyle)
.build()
Some(
ExprOuterClass.Expr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.comet.shims

import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.spark.sql.catalyst.expressions._

/**
Expand All @@ -30,6 +30,8 @@ trait CometExprShim extends CommonStringExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

protected def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE

def versionSpecificExprToProtoInternal(
expr: Expression,
inputs: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.comet.shims

import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.spark.sql.catalyst.expressions._

/**
Expand All @@ -30,6 +30,8 @@ trait CometExprShim extends CommonStringExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

protected def binaryOutputStyle: BinaryOutputStyle = BinaryOutputStyle.HEX_DISCRETE

def versionSpecificExprToProtoInternal(
expr: Expression,
inputs: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.comet.shims

import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.types.StringTypeWithCollation
import org.apache.spark.sql.types.{BinaryType, BooleanType, StringType}

Expand All @@ -33,6 +34,16 @@ trait CometExprShim extends CommonStringExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

protected def binaryOutputStyle: BinaryOutputStyle = {
SQLConf.get.getConf(SQLConf.BINARY_OUTPUT_STYLE).map(SQLConf.BinaryOutputStyle.withName) match {
case Some(SQLConf.BinaryOutputStyle.UTF8) => BinaryOutputStyle.UTF8
case Some(SQLConf.BinaryOutputStyle.BASIC) => BinaryOutputStyle.BASIC
case Some(SQLConf.BinaryOutputStyle.BASE64) => BinaryOutputStyle.BASE64
case Some(SQLConf.BinaryOutputStyle.HEX) => BinaryOutputStyle.HEX
case _ => BinaryOutputStyle.HEX_DISCRETE
}
}

def versionSpecificExprToProtoInternal(
expr: Expression,
inputs: Seq[Attribute],
Expand Down
3 changes: 1 addition & 2 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -827,8 +827,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {

// CAST from BinaryType

ignore("cast BinaryType to StringType") {
// https://github.com/apache/datafusion-comet/issues/377
test("cast BinaryType to StringType") {
castTest(generateBinary(), DataTypes.StringType)
}

Expand Down
Loading