|
19 | 19 |
|
20 | 20 | use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; |
21 | 21 | use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions}; |
22 | | -use arrow_schema::{DataType, Schema, SchemaRef}; |
| 22 | +use arrow_schema::{Schema, SchemaRef}; |
23 | 23 | use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; |
24 | | -use datafusion_comet_spark_expr::EvalMode; |
25 | | -use datafusion_common::plan_err; |
26 | 24 | use datafusion_expr::ColumnarValue; |
27 | | -use std::collections::HashMap; |
28 | 25 | use std::sync::Arc; |
29 | 26 |
|
30 | 27 | /// An implementation of DataFusion's `SchemaAdapterFactory` that uses a Spark-compatible |
@@ -107,24 +104,11 @@ impl SchemaAdapter for SparkSchemaAdapter { |
107 | 104 | let mut field_mappings = vec![None; self.required_schema.fields().len()]; |
108 | 105 |
|
109 | 106 | for (file_idx, file_field) in file_schema.fields.iter().enumerate() { |
110 | | - if let Some((table_idx, table_field)) = |
| 107 | + if let Some((table_idx, _table_field)) = |
111 | 108 | self.required_schema.fields().find(file_field.name()) |
112 | 109 | { |
113 | | - if cast_supported( |
114 | | - file_field.data_type(), |
115 | | - table_field.data_type(), |
116 | | - &self.parquet_options, |
117 | | - ) { |
118 | | - field_mappings[table_idx] = Some(projection.len()); |
119 | | - projection.push(file_idx); |
120 | | - } else { |
121 | | - return plan_err!( |
122 | | - "Cannot cast file schema field {} of type {:?} to required schema field of type {:?}", |
123 | | - file_field.name(), |
124 | | - file_field.data_type(), |
125 | | - table_field.data_type() |
126 | | - ); |
127 | | - } |
| 110 | + field_mappings[table_idx] = Some(projection.len()); |
| 111 | + projection.push(file_idx); |
128 | 112 | } |
129 | 113 | } |
130 | 114 |
|
@@ -285,254 +269,6 @@ impl SchemaMapper for SchemaMapping { |
285 | 269 | } |
286 | 270 | } |
287 | 271 |
|
288 | | -/// Determine if Comet supports a cast, taking options such as EvalMode and Timezone into account. |
289 | | -fn cast_supported(from_type: &DataType, to_type: &DataType, options: &SparkParquetOptions) -> bool { |
290 | | - use DataType::*; |
291 | | - |
292 | | - let from_type = if let Dictionary(_, dt) = from_type { |
293 | | - dt |
294 | | - } else { |
295 | | - from_type |
296 | | - }; |
297 | | - |
298 | | - let to_type = if let Dictionary(_, dt) = to_type { |
299 | | - dt |
300 | | - } else { |
301 | | - to_type |
302 | | - }; |
303 | | - |
304 | | - if from_type == to_type { |
305 | | - return true; |
306 | | - } |
307 | | - |
308 | | - match (from_type, to_type) { |
309 | | - (Boolean, _) => can_convert_from_boolean(to_type, options), |
310 | | - (UInt8 | UInt16 | UInt32 | UInt64, Int8 | Int16 | Int32 | Int64) |
311 | | - if options.allow_cast_unsigned_ints => |
312 | | - { |
313 | | - true |
314 | | - } |
315 | | - (Int8, _) => can_convert_from_byte(to_type, options), |
316 | | - (Int16, _) => can_convert_from_short(to_type, options), |
317 | | - (Int32, _) => can_convert_from_int(to_type, options), |
318 | | - (Int64, _) => can_convert_from_long(to_type, options), |
319 | | - (Float32, _) => can_convert_from_float(to_type, options), |
320 | | - (Float64, _) => can_convert_from_double(to_type, options), |
321 | | - (Decimal128(p, s), _) => can_convert_from_decimal(p, s, to_type, options), |
322 | | - (Timestamp(_, None), _) => can_convert_from_timestamp_ntz(to_type, options), |
323 | | - (Timestamp(_, Some(_)), _) => can_convert_from_timestamp(to_type, options), |
324 | | - (Utf8 | LargeUtf8, _) => can_convert_from_string(to_type, options), |
325 | | - (_, Utf8 | LargeUtf8) => can_cast_to_string(from_type, options), |
326 | | - (Struct(from_fields), Struct(to_fields)) => { |
327 | | - // TODO some of this logic may be specific to converting Parquet to Spark |
328 | | - let mut field_types = HashMap::new(); |
329 | | - for field in from_fields { |
330 | | - field_types.insert(field.name(), field.data_type()); |
331 | | - } |
332 | | - if field_types.iter().len() != from_fields.len() { |
333 | | - return false; |
334 | | - } |
335 | | - for field in to_fields { |
336 | | - if let Some(from_type) = field_types.get(&field.name()) { |
337 | | - if !cast_supported(from_type, field.data_type(), options) { |
338 | | - return false; |
339 | | - } |
340 | | - } else { |
341 | | - return false; |
342 | | - } |
343 | | - } |
344 | | - true |
345 | | - } |
346 | | - _ => false, |
347 | | - } |
348 | | -} |
349 | | - |
350 | | -fn can_convert_from_string(to_type: &DataType, options: &SparkParquetOptions) -> bool { |
351 | | - use DataType::*; |
352 | | - match to_type { |
353 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Binary => true, |
354 | | - Float32 | Float64 => { |
355 | | - // https://github.com/apache/datafusion-comet/issues/326 |
356 | | - // Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. |
357 | | - // Does not support ANSI mode. |
358 | | - options.allow_incompat |
359 | | - } |
360 | | - Decimal128(_, _) => { |
361 | | - // https://github.com/apache/datafusion-comet/issues/325 |
362 | | - // Does not support inputs ending with 'd' or 'f'. Does not support 'inf'. |
363 | | - // Does not support ANSI mode. Returns 0.0 instead of null if input contains no digits |
364 | | - |
365 | | - options.allow_incompat |
366 | | - } |
367 | | - Date32 | Date64 => { |
368 | | - // https://github.com/apache/datafusion-comet/issues/327 |
369 | | - // Only supports years between 262143 BC and 262142 AD |
370 | | - options.allow_incompat |
371 | | - } |
372 | | - Timestamp(_, _) if options.eval_mode == EvalMode::Ansi => { |
373 | | - // ANSI mode not supported |
374 | | - false |
375 | | - } |
376 | | - Timestamp(_, Some(tz)) if tz.as_ref() != "UTC" => { |
377 | | - // Cast will use UTC instead of $timeZoneId |
378 | | - options.allow_incompat |
379 | | - } |
380 | | - Timestamp(_, _) => { |
381 | | - // https://github.com/apache/datafusion-comet/issues/328 |
382 | | - // Not all valid formats are supported |
383 | | - options.allow_incompat |
384 | | - } |
385 | | - _ => false, |
386 | | - } |
387 | | -} |
388 | | - |
389 | | -fn can_cast_to_string(from_type: &DataType, options: &SparkParquetOptions) -> bool { |
390 | | - use DataType::*; |
391 | | - match from_type { |
392 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Date32 | Date64 | Timestamp(_, _) => true, |
393 | | - Float32 | Float64 => { |
394 | | - // There can be differences in precision. |
395 | | - // For example, the input \"1.4E-45\" will produce 1.0E-45 " + |
396 | | - // instead of 1.4E-45")) |
397 | | - true |
398 | | - } |
399 | | - Decimal128(_, _) => { |
400 | | - // https://github.com/apache/datafusion-comet/issues/1068 |
401 | | - // There can be formatting differences in some case due to Spark using |
402 | | - // scientific notation where Comet does not |
403 | | - true |
404 | | - } |
405 | | - Binary => { |
406 | | - // https://github.com/apache/datafusion-comet/issues/377 |
407 | | - // Only works for binary data representing valid UTF-8 strings |
408 | | - options.allow_incompat |
409 | | - } |
410 | | - Struct(fields) => fields |
411 | | - .iter() |
412 | | - .all(|f| can_cast_to_string(f.data_type(), options)), |
413 | | - _ => false, |
414 | | - } |
415 | | -} |
416 | | - |
417 | | -fn can_convert_from_timestamp_ntz(to_type: &DataType, options: &SparkParquetOptions) -> bool { |
418 | | - use DataType::*; |
419 | | - match to_type { |
420 | | - Timestamp(_, _) => true, |
421 | | - Date32 | Date64 | Utf8 => { |
422 | | - // incompatible |
423 | | - options.allow_incompat |
424 | | - } |
425 | | - _ => { |
426 | | - // unsupported |
427 | | - false |
428 | | - } |
429 | | - } |
430 | | -} |
431 | | - |
432 | | -fn can_convert_from_timestamp(to_type: &DataType, _options: &SparkParquetOptions) -> bool { |
433 | | - use DataType::*; |
434 | | - match to_type { |
435 | | - Timestamp(_, _) => true, |
436 | | - Boolean | Int8 | Int16 => { |
437 | | - // https://github.com/apache/datafusion-comet/issues/352 |
438 | | - // this seems like an edge case that isn't important for us to support |
439 | | - false |
440 | | - } |
441 | | - Int64 => { |
442 | | - // https://github.com/apache/datafusion-comet/issues/352 |
443 | | - true |
444 | | - } |
445 | | - Date32 | Date64 | Utf8 | Decimal128(_, _) => true, |
446 | | - _ => { |
447 | | - // unsupported |
448 | | - false |
449 | | - } |
450 | | - } |
451 | | -} |
452 | | - |
453 | | -fn can_convert_from_boolean(to_type: &DataType, _: &SparkParquetOptions) -> bool { |
454 | | - use DataType::*; |
455 | | - matches!(to_type, Int8 | Int16 | Int32 | Int64 | Float32 | Float64) |
456 | | -} |
457 | | - |
458 | | -fn can_convert_from_byte(to_type: &DataType, _: &SparkParquetOptions) -> bool { |
459 | | - use DataType::*; |
460 | | - matches!( |
461 | | - to_type, |
462 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Decimal128(_, _) |
463 | | - ) |
464 | | -} |
465 | | - |
466 | | -fn can_convert_from_short(to_type: &DataType, _: &SparkParquetOptions) -> bool { |
467 | | - use DataType::*; |
468 | | - matches!( |
469 | | - to_type, |
470 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Decimal128(_, _) |
471 | | - ) |
472 | | -} |
473 | | - |
474 | | -fn can_convert_from_int(to_type: &DataType, options: &SparkParquetOptions) -> bool { |
475 | | - use DataType::*; |
476 | | - match to_type { |
477 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 | Utf8 => true, |
478 | | - Decimal128(_, _) => { |
479 | | - // incompatible: no overflow check |
480 | | - options.allow_incompat |
481 | | - } |
482 | | - _ => false, |
483 | | - } |
484 | | -} |
485 | | - |
486 | | -fn can_convert_from_long(to_type: &DataType, options: &SparkParquetOptions) -> bool { |
487 | | - use DataType::*; |
488 | | - match to_type { |
489 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => true, |
490 | | - Decimal128(_, _) => { |
491 | | - // incompatible: no overflow check |
492 | | - options.allow_incompat |
493 | | - } |
494 | | - _ => false, |
495 | | - } |
496 | | -} |
497 | | - |
498 | | -fn can_convert_from_float(to_type: &DataType, _: &SparkParquetOptions) -> bool { |
499 | | - use DataType::*; |
500 | | - matches!( |
501 | | - to_type, |
502 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Float64 | Decimal128(_, _) |
503 | | - ) |
504 | | -} |
505 | | - |
506 | | -fn can_convert_from_double(to_type: &DataType, _: &SparkParquetOptions) -> bool { |
507 | | - use DataType::*; |
508 | | - matches!( |
509 | | - to_type, |
510 | | - Boolean | Int8 | Int16 | Int32 | Int64 | Float32 | Decimal128(_, _) |
511 | | - ) |
512 | | -} |
513 | | - |
514 | | -fn can_convert_from_decimal( |
515 | | - p1: &u8, |
516 | | - _s1: &i8, |
517 | | - to_type: &DataType, |
518 | | - options: &SparkParquetOptions, |
519 | | -) -> bool { |
520 | | - use DataType::*; |
521 | | - match to_type { |
522 | | - Int8 | Int16 | Int32 | Int64 | Float32 | Float64 => true, |
523 | | - Decimal128(p2, _) => { |
524 | | - if p2 < p1 { |
525 | | - // https://github.com/apache/datafusion/issues/13492 |
526 | | - // Incompatible(Some("Casting to smaller precision is not supported")) |
527 | | - options.allow_incompat |
528 | | - } else { |
529 | | - true |
530 | | - } |
531 | | - } |
532 | | - _ => false, |
533 | | - } |
534 | | -} |
535 | | - |
536 | 272 | #[cfg(test)] |
537 | 273 | mod test { |
538 | 274 | use crate::parquet::parquet_support::SparkParquetOptions; |
|
0 commit comments