Skip to content

Commit c6ffd1f

Browse files
committed
Merge branch-25.06 into merge-branch-25.06-to-main
2 parents 9dbb8eb + da0fab8 commit c6ffd1f

File tree

20 files changed

+503
-826
lines changed

20 files changed

+503
-826
lines changed

CHANGELOG.md

Lines changed: 218 additions & 181 deletions
Large diffs are not rendered by default.

delta-lake/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ and directory contains the corresponding support code.
1616
| 2.2.x | Spark 3.3.x | `delta-22x` |
1717
| 2.3.x | Spark 3.3.x | `delta-23x` |
1818
| 2.4.x | Spark 3.4.x | `delta-24x` |
19-
| 3.3.x | Spark 3.5.[3-] | `delta-33x` |
2019
| Databricks 12.2 | Databricks 12.2 | `delta-spark332db` |
2120
| Databricks 13.3 | Databricks 13.3 | `delta-spark341db` |
2221
| Databricks 14.3 | Databricks 14.3 | `delta-spark350db143` |

delta-lake/common/src/main/delta-io/scala/com/nvidia/spark/rapids/delta/GpuDeltaParquetFileFormat.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ trait GpuDeltaParquetFileFormat extends GpuReadParquetFileFormat {
3636
val columnMappingMode: DeltaColumnMappingMode
3737
val referenceSchema: StructType
3838

39+
/**
40+
* prepareSchema must only be used for parquet read.
41+
* It removes "PARQUET_FIELD_ID_METADATA_KEY" for name mapping mode which address columns by
42+
* physical name instead of id.
43+
*/
3944
def prepareSchema(inputSchema: StructType): StructType = {
4045
val schema = DeltaColumnMapping.createPhysicalSchema(
4146
inputSchema, referenceSchema, columnMappingMode)

delta-lake/delta-33x/src/main/scala/com/nvidia/spark/rapids/delta/delta33x/GpuDelta33xParquetFileFormat.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,10 @@ case class GpuDelta33xParquetFileFormat(
7272
}
7373

7474
/**
75-
* prepareSchemaForRead must only be used for parquet read.
76-
* It removes "PARQUET_FIELD_ID_METADATA_KEY" for name mapping mode which address columns by
77-
* physical name instead of id.
75+
* This function is overridden as Delta 3.3 has an extra `PARQUET_FIELD_NESTED_IDS_METADATA_KEY`
76+
* key to remove from the metadata, which does not exist in earlier versions.
7877
*/
79-
def prepareSchemaForRead(inputSchema: StructType): StructType = {
78+
override def prepareSchema(inputSchema: StructType): StructType = {
8079
val schema = DeltaColumnMapping.createPhysicalSchema(
8180
inputSchema, referenceSchema, columnMappingMode)
8281
if (columnMappingMode == NameMapping) {
@@ -150,9 +149,9 @@ case class GpuDelta33xParquetFileFormat(
150149

151150
val dataReader = super.buildReaderWithPartitionValuesAndMetrics(
152151
sparkSession,
153-
prepareSchemaForRead(dataSchema),
154-
prepareSchemaForRead(partitionSchema),
155-
prepareSchemaForRead(requiredSchema),
152+
dataSchema,
153+
partitionSchema,
154+
requiredSchema,
156155
prepareFiltersForRead(filters),
157156
options,
158157
hadoopConf,

docs/archives/CHANGELOG_25.02.md

Lines changed: 182 additions & 0 deletions
Large diffs are not rendered by default.

docs/dev/mem_debug.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ be set to either `STDERR` or `STDOUT` to see everything that is happening with t
136136
```
137137

138138
The format of this is not really documented anywhere, but it uses the
139-
[logging_resource_adaptor](https://github.com/rapidsai/rmm/blob/main/include/rmm/mr/device/logging_resource_adaptor.hpp)
139+
[logging_resource_adaptor](https://github.com/rapidsai/rmm/blob/main/cpp/include/rmm/mr/device/logging_resource_adaptor.hpp)
140140
to log when an allocation succeeded or failed and when memory was freed. The current format
141141
appears to be.
142142

iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuReaderFactory.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.nvidia.spark.rapids.iceberg.parquet.{MultiFile, MultiThread, SingleFi
2323
import org.apache.iceberg.{FileFormat, ScanTask, ScanTaskGroup}
2424
import scala.collection.JavaConverters._
2525

26+
import org.apache.spark.internal.Logging
2627
import org.apache.spark.sql.catalyst.InternalRow
2728
import org.apache.spark.sql.connector.read.InputPartition
2829
import org.apache.spark.sql.connector.read.PartitionReader
@@ -31,7 +32,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
3132

3233
class GpuReaderFactory(private val metrics: Map[String, GpuMetric],
3334
rapidsConf: RapidsConf,
34-
queryUsesInputFile: Boolean) extends PartitionReaderFactory {
35+
queryUsesInputFile: Boolean) extends PartitionReaderFactory with Logging {
3536

3637
private val allCloudSchemes = rapidsConf.getCloudSchemes.toSet
3738
private val isParquetPerFileReadEnabled = rapidsConf.isParquetPerFileReadEnabled
@@ -71,8 +72,13 @@ class GpuReaderFactory(private val metrics: Map[String, GpuMetric],
7172
val allParquet = scans.forall(_.file.format == FileFormat.PARQUET)
7273

7374
if (allParquet) {
74-
if (isParquetPerFileReadEnabled) {
75-
// If per-file read is enabled, we can only use single threaded reading.
75+
// If per-file read is enabled, we can only use single threaded reading.
76+
// We also disable multi-thread reader when there exists deletions, as a quick workaround for
77+
// https://github.com/NVIDIA/spark-rapids/issues/12885
78+
if (isParquetPerFileReadEnabled || !hasNoDeletes) {
79+
if (!hasNoDeletes) {
80+
logWarning("Multithread iceberg parquet reader disabled with deletions")
81+
}
7682
return SingleFile
7783
}
7884

integration_tests/src/main/python/date_time_test.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -468,16 +468,27 @@ def test_to_utc_timestamp_fixed_offset(time_zone):
468468
assert_gpu_and_cpu_are_equal_collect(
469469
lambda spark: unary_op_df(spark, tz_timestamp_gen).selectExpr(f'to_utc_timestamp(a, "{time_zone}")'))
470470

471-
471+
# test from_utc_timestamp
472+
# If `end_timestamp` is 2200 year, then generated timestamps are < 2200 year, will use GPU to compute both DST and non-DST timezones.
473+
# If it has any generated timestamp is > 2200 year and timezone is DST, then `fallback` to CPU.
474+
# The `fallback` means GPU operator invokes CPU to compute, not really fallback to CPU.
472475
@pytest.mark.parametrize('time_zone', all_timezones, ids=idfn)
473-
def test_comprehensive_from_utc_timestamp(time_zone):
474-
tz_timestamp_gen = TimestampGen(tzinfo=timezone.utc)
476+
@pytest.mark.parametrize('end_timestamp', [last_supported_tz_time, None], ids=idfn)
477+
def test_comprehensive_from_utc_timestamp(time_zone, end_timestamp):
478+
# if end = None, will use the default value
479+
tz_timestamp_gen = TimestampGen(end = end_timestamp, tzinfo=timezone.utc)
475480
assert_gpu_and_cpu_are_equal_collect(
476481
lambda spark: unary_op_df(spark, tz_timestamp_gen).selectExpr(f'from_utc_timestamp(a, "{time_zone}")'))
477-
482+
483+
# test to_utc_timestamp
484+
# If `end_timestamp` is 2200 year, then generated timestamps are < 2200 year, will use GPU to compute both DST and non-DST timezones.
485+
# If it has any generated timestamp is > 2200 year and timezone is DST, then `fallback` to CPU.
486+
# The `fallback` means GPU operator invokes CPU to compute, not really fallback to CPU.
478487
@pytest.mark.parametrize('time_zone', all_timezones, ids=idfn)
479-
def test_comprehensive_to_utc_timestamp(time_zone):
480-
tz_timestamp_gen = TimestampGen(end=last_supported_tz_time, tzinfo=tz.gettz(time_zone))
488+
@pytest.mark.parametrize('end_timestamp', [last_supported_tz_time, None], ids=idfn)
489+
def test_comprehensive_to_utc_timestamp(time_zone, end_timestamp):
490+
# if end = None, will use the default value
491+
tz_timestamp_gen = TimestampGen(end=end_timestamp, tzinfo=tz.gettz(time_zone))
481492
assert_gpu_and_cpu_are_equal_collect(
482493
lambda spark: unary_op_df(spark, tz_timestamp_gen).selectExpr(f'to_utc_timestamp(a, "{time_zone}")'))
483494

integration_tests/src/main/python/iceberg/iceberg_merge_on_read_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def test_iceberg_v2_position_delete_with_url_encoded_path(spark_tmp_table_factor
8484
@iceberg
8585
@ignore_order(local=True)
8686
@pytest.mark.parametrize('reader_type', rapids_reader_types)
87+
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/12885')
8788
def test_iceberg_v2_mixed_deletes(spark_tmp_table_factory, spark_tmp_path, reader_type):
8889
# We use a fixed seed here to ensure that data deletion vector has been generated
8990
table_name = setup_base_iceberg_table(spark_tmp_table_factory,

0 commit comments

Comments
 (0)