Skip to content

Commit 34daa54

Browse files
authored
fix: regressions in CometToPrettyStringSuite (#2384)
* Introduce BinaryOutputStyle from Spark 4.0 * Allow casting from binary to string * Pass binaryOutputStyle to query plan serde * Take binaryOutputStyle in planner * Implement Spark-style ToPrettyString * Match file name w/ test name * Test all 5 BinaryOutputStyle in Spark 4.0 * Fix package: 'org.apache.sql' -> 'org.apache.spark.sql' * Add CometToPrettyStringSuite back to CI * Specify binaryOutputStyle for Spark 3.4 * Let Comet deal with non pretty string casting * Enable binary to string casting test * Attempt to fix the build; ToPrettyString is Spark 3.5+ * Removed resolved issues * Type casting only function * Extract test setup logic to CometFuzzTestBase * Move binary_output_style proto <-> enum mapping to core * Move BinaryOutputStyle from cast.rs to lib.rs * Remove incorrect comments
1 parent c1e1120 commit 34daa54

File tree

19 files changed

+215
-68
lines changed

19 files changed

+215
-68
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ jobs:
149149
org.apache.comet.CometBitwiseExpressionSuite
150150
org.apache.comet.CometMapExpressionSuite
151151
org.apache.comet.objectstore.NativeConfigSuite
152+
- name: "sql"
153+
value: |
154+
${{ matrix.profile.maven_opts != 'Spark 3.4, JDK 11, Scala 2.12' && 'org.apache.spark.sql.CometToPrettyStringSuite' || ''}}
152155
fail-fast: false
153156
name: ${{ matrix.os }}/${{ matrix.profile.name }} [${{ matrix.suite.name }}]
154157
runs-on: ${{ matrix.os }}

.github/workflows/pr_build_macos.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ jobs:
114114
org.apache.comet.CometBitwiseExpressionSuite
115115
org.apache.comet.CometMapExpressionSuite
116116
org.apache.comet.objectstore.NativeConfigSuite
117+
- name: "sql"
118+
value: |
119+
${{ matrix.profile.maven_opts != 'Spark 3.4, JDK 11, Scala 2.12' && 'org.apache.spark.sql.CometToPrettyStringSuite' || ''}}
117120
fail-fast: false
118121
name: ${{ matrix.os }}/${{ matrix.profile.name }} [${{ matrix.suite.name }}]
119122
runs-on: ${{ matrix.os }}

dev/ci/check-suites.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ def file_to_class_name(path: Path) -> str | None:
3636
"org.apache.comet.parquet.ParquetReadFromS3Suite", # manual test suite
3737
"org.apache.spark.sql.comet.CometPlanStabilitySuite", # abstract
3838
"org.apache.spark.sql.comet.ParquetDatetimeRebaseSuite", # abstract
39-
"org.apache.comet.exec.CometColumnarShuffleSuite", # abstract
40-
# TODO add CometToPrettyStringSuite to PR worklows
41-
# https://github.com/apache/datafusion-comet/issues/2307
42-
"org.apache.spark.sql.CometToPrettyStringSuite"
39+
"org.apache.comet.exec.CometColumnarShuffleSuite" # abstract
4340
]
4441

4542
for workflow_filename in [".github/workflows/pr_build_linux.yml", ".github/workflows/pr_build_macos.yml"]:

docs/source/user-guide/latest/compatibility.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ The following cast operations are generally compatible with Spark except for the
210210
| string | long | |
211211
| string | binary | |
212212
| string | date | Only supports years between 262143 BC and 262142 AD |
213+
| binary | string | |
213214
| date | string | |
214215
| timestamp | long | |
215216
| timestamp | string | |
@@ -233,7 +234,6 @@ The following cast operations are not compatible with Spark for all inputs and a
233234
| string | double | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. |
234235
| string | decimal | Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits |
235236
| string | timestamp | Not all valid formats are supported |
236-
| binary | string | Only works for binary data representing valid UTF-8 strings |
237237
<!--END:INCOMPAT_CAST_TABLE-->
238238

239239
### Unsupported Casts

native/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/execution/planner.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ use datafusion::{
6262
prelude::SessionContext,
6363
};
6464
use datafusion_comet_spark_expr::{
65-
create_comet_physical_fun, create_modulo_expr, create_negate_expr, BloomFilterAgg,
66-
BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
65+
create_comet_physical_fun, create_modulo_expr, create_negate_expr, BinaryOutputStyle,
66+
BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond,
6767
};
6868

6969
use crate::execution::operators::ExecutionError::GeneralError;
@@ -809,6 +809,8 @@ impl PhysicalPlanner {
809809
SparkCastOptions::new(EvalMode::Try, &expr.timezone, true);
810810
let null_string = "NULL";
811811
spark_cast_options.null_string = null_string.to_string();
812+
spark_cast_options.binary_output_style =
813+
from_protobuf_binary_output_style(expr.binary_output_style).ok();
812814
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
813815
let cast = Arc::new(Cast::new(
814816
Arc::clone(&child),
@@ -2693,6 +2695,18 @@ fn create_case_expr(
26932695
}
26942696
}
26952697

2698+
fn from_protobuf_binary_output_style(
2699+
value: i32,
2700+
) -> Result<BinaryOutputStyle, prost::UnknownEnumValue> {
2701+
match spark_expression::BinaryOutputStyle::try_from(value)? {
2702+
spark_expression::BinaryOutputStyle::Utf8 => Ok(BinaryOutputStyle::Utf8),
2703+
spark_expression::BinaryOutputStyle::Basic => Ok(BinaryOutputStyle::Basic),
2704+
spark_expression::BinaryOutputStyle::Base64 => Ok(BinaryOutputStyle::Base64),
2705+
spark_expression::BinaryOutputStyle::Hex => Ok(BinaryOutputStyle::Hex),
2706+
spark_expression::BinaryOutputStyle::HexDiscrete => Ok(BinaryOutputStyle::HexDiscrete),
2707+
}
2708+
}
2709+
26962710
fn literal_to_array_ref(
26972711
data_type: DataType,
26982712
list_literal: ListLiteral,

native/proto/src/proto/expr.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,18 @@ message ToJson {
269269
bool ignore_null_fields = 6;
270270
}
271271

272+
enum BinaryOutputStyle {
273+
UTF8 = 0;
274+
BASIC = 1;
275+
BASE64 = 2;
276+
HEX = 3;
277+
HEX_DISCRETE = 4;
278+
}
279+
272280
message ToPrettyString {
273281
Expr child = 1;
274282
string timezone = 2;
283+
BinaryOutputStyle binaryOutputStyle = 3;
275284
}
276285

277286
message Hour {

native/spark-expr/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ thiserror = { workspace = true }
3737
futures = { workspace = true }
3838
twox-hash = "2.1.2"
3939
rand = { workspace = true }
40+
hex = "0.4.3"
41+
base64 = "0.22.1"
4042

4143
[dev-dependencies]
4244
arrow = {workspace = true}

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::timezone;
1918
use crate::utils::array_with_timezone;
19+
use crate::{timezone, BinaryOutputStyle};
2020
use crate::{EvalMode, SparkError, SparkResult};
2121
use arrow::array::builder::StringBuilder;
22-
use arrow::array::{DictionaryArray, StringArray, StructArray};
22+
use arrow::array::{DictionaryArray, GenericByteArray, StringArray, StructArray};
2323
use arrow::compute::can_cast_types;
24-
use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType, Schema};
24+
use arrow::datatypes::{
25+
ArrowDictionaryKeyType, ArrowNativeType, DataType, GenericBinaryType, Schema,
26+
};
2527
use arrow::{
2628
array::{
2729
cast::AsArray,
@@ -60,6 +62,8 @@ use std::{
6062
sync::Arc,
6163
};
6264

65+
use base64::prelude::*;
66+
6367
static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");
6468

6569
const MICROS_PER_SECOND: i64 = 1000000;
@@ -260,11 +264,7 @@ fn can_cast_to_string(from_type: &DataType, options: &SparkCastOptions) -> bool
260264
// scientific notation where Comet does not
261265
true
262266
}
263-
Binary => {
264-
// https://github.com/apache/datafusion-comet/issues/377
265-
// Only works for binary data representing valid UTF-8 strings
266-
options.allow_incompat
267-
}
267+
Binary => true,
268268
Struct(fields) => fields
269269
.iter()
270270
.all(|f| can_cast_to_string(f.data_type(), options)),
@@ -816,6 +816,8 @@ pub struct SparkCastOptions {
816816
pub is_adapting_schema: bool,
817817
/// String to use to represent null values
818818
pub null_string: String,
819+
/// SparkSQL's binaryOutputStyle
820+
pub binary_output_style: Option<BinaryOutputStyle>,
819821
}
820822

821823
impl SparkCastOptions {
@@ -827,6 +829,7 @@ impl SparkCastOptions {
827829
allow_cast_unsigned_ints: false,
828830
is_adapting_schema: false,
829831
null_string: "null".to_string(),
832+
binary_output_style: None,
830833
}
831834
}
832835

@@ -838,6 +841,7 @@ impl SparkCastOptions {
838841
allow_cast_unsigned_ints: false,
839842
is_adapting_schema: false,
840843
null_string: "null".to_string(),
844+
binary_output_style: None,
841845
}
842846
}
843847
}
@@ -1027,6 +1031,7 @@ fn cast_array(
10271031
{
10281032
Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?)
10291033
}
1034+
(Binary, Utf8) => Ok(cast_binary_to_string::<i32>(&array, cast_options)?),
10301035
_ if cast_options.is_adapting_schema
10311036
|| is_datafusion_spark_compatible(from_type, to_type, cast_options.allow_incompat) =>
10321037
{
@@ -1045,6 +1050,74 @@ fn cast_array(
10451050
Ok(spark_cast_postprocess(cast_result?, from_type, to_type))
10461051
}
10471052

1053+
fn cast_binary_to_string<O: OffsetSizeTrait>(
1054+
array: &dyn Array,
1055+
spark_cast_options: &SparkCastOptions,
1056+
) -> Result<ArrayRef, ArrowError> {
1057+
let input = array
1058+
.as_any()
1059+
.downcast_ref::<GenericByteArray<GenericBinaryType<O>>>()
1060+
.unwrap();
1061+
1062+
fn binary_formatter(value: &[u8], spark_cast_options: &SparkCastOptions) -> String {
1063+
match spark_cast_options.binary_output_style {
1064+
Some(s) => spark_binary_formatter(value, s),
1065+
None => cast_binary_formatter(value),
1066+
}
1067+
}
1068+
1069+
let output_array = input
1070+
.iter()
1071+
.map(|value| match value {
1072+
Some(value) => Ok(Some(binary_formatter(value, spark_cast_options))),
1073+
_ => Ok(None),
1074+
})
1075+
.collect::<Result<GenericStringArray<O>, ArrowError>>()?;
1076+
Ok(Arc::new(output_array))
1077+
}
1078+
1079+
/// This function mimics the [BinaryFormatter]: https://github.com/apache/spark/blob/v4.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ToStringBase.scala#L449-L468
1080+
/// used by SparkSQL's ToPrettyString expression.
1081+
/// The BinaryFormatter was [introduced]: https://issues.apache.org/jira/browse/SPARK-47911 in Spark 4.0.0
1082+
/// Before Spark 4.0.0, the default is SPACE_DELIMITED_UPPERCASE_HEX
1083+
fn spark_binary_formatter(value: &[u8], binary_output_style: BinaryOutputStyle) -> String {
1084+
match binary_output_style {
1085+
BinaryOutputStyle::Utf8 => String::from_utf8(value.to_vec()).unwrap(),
1086+
BinaryOutputStyle::Basic => {
1087+
format!(
1088+
"{:?}",
1089+
value
1090+
.iter()
1091+
.map(|v| i8::from_ne_bytes([*v]))
1092+
.collect::<Vec<i8>>()
1093+
)
1094+
}
1095+
BinaryOutputStyle::Base64 => BASE64_STANDARD_NO_PAD.encode(value),
1096+
BinaryOutputStyle::Hex => value
1097+
.iter()
1098+
.map(|v| hex::encode_upper([*v]))
1099+
.collect::<String>(),
1100+
BinaryOutputStyle::HexDiscrete => {
1101+
// Spark's default SPACE_DELIMITED_UPPERCASE_HEX
1102+
format!(
1103+
"[{}]",
1104+
value
1105+
.iter()
1106+
.map(|v| hex::encode_upper([*v]))
1107+
.collect::<Vec<String>>()
1108+
.join(" ")
1109+
)
1110+
}
1111+
}
1112+
}
1113+
1114+
fn cast_binary_formatter(value: &[u8]) -> String {
1115+
match String::from_utf8(value.to_vec()) {
1116+
Ok(value) => value,
1117+
Err(_) => unsafe { String::from_utf8_unchecked(value.to_vec()) },
1118+
}
1119+
}
1120+
10481121
/// Determines if DataFusion supports the given cast in a way that is
10491122
/// compatible with Spark
10501123
fn is_datafusion_spark_compatible(

native/spark-expr/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ pub enum EvalMode {
9898
Try,
9999
}
100100

101+
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
102+
pub enum BinaryOutputStyle {
103+
Utf8,
104+
Basic,
105+
Base64,
106+
Hex,
107+
HexDiscrete,
108+
}
109+
101110
pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError {
102111
SparkError::ArithmeticOverflow {
103112
from_type: from_type.to_string(),

0 commit comments

Comments
 (0)