Skip to content
Open
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
2ab5557
AI draft for protocol buffer support
thirtiseven Dec 19, 2025
d261358
AI draft for Hadoop sequence file reader
thirtiseven Dec 22, 2025
cd31fad
Revert "AI draft for protocol buffer support"
thirtiseven Dec 23, 2025
1278c66
clean up
thirtiseven Dec 23, 2025
10933b0
update tests
thirtiseven Dec 23, 2025
e965b01
address comment
thirtiseven Dec 25, 2025
f89f8c1
address comment
thirtiseven Dec 25, 2025
02c0752
address comment
thirtiseven Dec 25, 2025
f3bcf9d
address comment
thirtiseven Dec 25, 2025
562672a
copyrights
thirtiseven Jan 4, 2026
2e10fbd
refactor
thirtiseven Jan 5, 2026
572c0da
address comments
thirtiseven Jan 6, 2026
9b1162e
address comments
thirtiseven Jan 6, 2026
f95910f
address comments
thirtiseven Jan 6, 2026
481bfbe
multi-thread reader
thirtiseven Jan 7, 2026
bd526c5
delete perf test
thirtiseven Jan 7, 2026
cf33cf4
address commens
thirtiseven Jan 8, 2026
af43f3e
address comments
thirtiseven Jan 8, 2026
288152a
remove COALESCING reader
thirtiseven Jan 8, 2026
ea91eab
fix
thirtiseven Jan 8, 2026
6a23c2e
fix
thirtiseven Jan 8, 2026
9847f16
make sequence file isSplitable to false due to data diff
thirtiseven Jan 9, 2026
c6b98fb
Merge branch 'seq_file_reader' of https://github.com/thirtiseven/spar…
thirtiseven Jan 9, 2026
70ad202
fix merge seqreader
thirtiseven Jan 9, 2026
f9f4a8c
use gpu reader
thirtiseven Jan 13, 2026
e6322bc
fix a bug
thirtiseven Jan 13, 2026
94f31ea
performance optimization
thirtiseven Jan 16, 2026
4139c00
fix
thirtiseven Jan 16, 2026
1b2fbe9
Revert "fix"
thirtiseven Jan 20, 2026
310ccbc
Revert "performance optimization"
thirtiseven Jan 20, 2026
81ccdfa
Revert "fix a bug"
thirtiseven Jan 20, 2026
28d0405
Revert "use gpu reader"
thirtiseven Jan 20, 2026
dcf6af0
fix OOM bug
thirtiseven Jan 20, 2026
87f5a72
performance optimzation
thirtiseven Jan 20, 2026
bcfcbc7
integration tests
thirtiseven Jan 21, 2026
7cc02cf
splitable true by default
thirtiseven Jan 21, 2026
143fc3d
logical rule
thirtiseven Jan 23, 2026
dc0bbfc
save a memory copy
thirtiseven Jan 26, 2026
9619bc0
fix perfile config
thirtiseven Jan 27, 2026
d052441
GPU combine to produce larger batch
thirtiseven Jan 28, 2026
98ee00f
support glob style path
thirtiseven Feb 2, 2026
e4fef5a
a bug fix, RDD conversion refinement
thirtiseven Feb 3, 2026
0f8f8ca
support compress
thirtiseven Feb 4, 2026
c60c978
huge upmerge from dev branch
thirtiseven Feb 25, 2026
18015d6
refactor
thirtiseven Feb 26, 2026
1b297b9
Merge remote-tracking branch 'origin/main' into seq_file_reader
thirtiseven Mar 12, 2026
3fe2cd6
fix scala 2.13 build
thirtiseven Mar 12, 2026
ed7fa84
verify and refactor
thirtiseven Mar 12, 2026
05c42a6
address comments
thirtiseven Mar 13, 2026
9934152
style
thirtiseven Mar 13, 2026
4fab896
style refactor
thirtiseven Mar 13, 2026
a6120f4
verify
thirtiseven Mar 13, 2026
4c3bdf6
verify back 330
thirtiseven Mar 13, 2026
bdc343c
address commmit
thirtiseven Mar 14, 2026
d8de16a
address commmit
thirtiseven Mar 14, 2026
98fcaba
address commmit
thirtiseven Mar 14, 2026
f7cb695
address commmit
thirtiseven Mar 15, 2026
9288e9f
address comments
thirtiseven Mar 15, 2026
069d36c
address commmit
thirtiseven Mar 16, 2026
3c0f35d
address commmit
thirtiseven Mar 16, 2026
1b2621f
address commmit
thirtiseven Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
440 changes: 440 additions & 0 deletions integration_tests/src/main/python/sequencefile_test.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.nvidia.spark.rapids.SequenceFileBinaryFileFormat
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import com.nvidia.spark.rapids.sequencefile.GpuSequenceFileMultiFilePartitionReaderFactory
import com.nvidia.spark.rapids.sequencefile.GpuSequenceFilePartitionReaderFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitionedFile}
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

/**
* A FileFormat that allows reading Hadoop SequenceFiles and returning raw key/value bytes as
* Spark SQL BinaryType columns.
*
* This is a GPU-enabled scan format in the sense that it returns GPU-backed ColumnarBatch output
* (the parsing itself is CPU-side IO + byte parsing).
*/
class GpuReadSequenceFileBinaryFormat extends FileFormat with GpuReadFileFormatWithMetrics {

override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = Some(SequenceFileBinaryFileFormat.dataSchema)

// TODO: Fix split boundary handling to enable multi-partition reads
// Currently disabled to ensure correct record counts
override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = false

override def buildReaderWithPartitionValuesAndMetrics(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric]): PartitionedFile => Iterator[InternalRow] = {
val sqlConf = sparkSession.sessionState.conf
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val rapidsConf = new RapidsConf(sqlConf)

val factory = GpuSequenceFilePartitionReaderFactory(
sqlConf,
broadcastedHadoopConf,
requiredSchema,
partitionSchema,
rapidsConf,
metrics,
options)
PartitionReaderIterator.buildReader(factory)
}

// Default to multi-file reads (recommended for many small files).
override def isPerFileReadEnabled(conf: RapidsConf): Boolean = false

override def createMultiFileReaderFactory(
broadcastedConf: Broadcast[SerializableConfiguration],
pushedFilters: Array[Filter],
fileScan: GpuFileSourceScanExec): PartitionReaderFactory = {
GpuSequenceFileMultiFilePartitionReaderFactory(
fileScan.conf,
broadcastedConf,
fileScan.requiredSchema,
fileScan.readPartitionSchema,
fileScan.rapidsConf,
fileScan.allMetrics,
fileScan.queryUsesInputFile)
}
}

object GpuReadSequenceFileBinaryFormat {
def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val fsse = meta.wrapped
val required = fsse.requiredSchema
// Only support reading BinaryType columns named "key" and/or "value".
required.fields.foreach { f =>
val isKey = f.name.equalsIgnoreCase(SequenceFileBinaryFileFormat.KEY_FIELD)
val isValue = f.name.equalsIgnoreCase(SequenceFileBinaryFileFormat.VALUE_FIELD)
if ((isKey || isValue) && f.dataType != org.apache.spark.sql.types.BinaryType) {
meta.willNotWorkOnGpu(
s"SequenceFileBinary only supports BinaryType for " +
s"'${SequenceFileBinaryFileFormat.KEY_FIELD}' and " +
s"'${SequenceFileBinaryFileFormat.VALUE_FIELD}' columns, but saw " +
s"${f.name}: ${f.dataType.catalogString}")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
* Copyright (c) 2019-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1678,6 +1678,33 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValue(v => v > 0, "The maximum number of files must be greater than 0.")
.createWithDefault(Integer.MAX_VALUE)

val SEQUENCEFILE_READER_TYPE = conf("spark.rapids.sql.format.sequencefile.reader.type")
.doc("Sets the SequenceFile reader type. Since SequenceFile decoding happens on the CPU " +
"(using Hadoop's SequenceFile.Reader), COALESCING mode is not supported and will throw " +
"an exception. Use PERFILE which individually reads files, or MULTITHREADED which uses " +
"multiple threads to read files in parallel, utilizing multiple CPU cores for I/O and " +
"decoding. MULTITHREADED is recommended when reading many files as it allows the CPU to " +
"keep reading while GPU is also doing work. " +
s"See $MULTITHREAD_READ_NUM_THREADS and " +
"spark.rapids.sql.format.sequencefile.multiThreadedRead.maxNumFilesParallel to control " +
"the number of threads and amount of memory used. " +
"By default this is set to AUTO which selects MULTITHREADED for cloud storage and " +
"PERFILE for local storage. See spark.rapids.cloudSchemes.")
.stringConf
.transform(_.toUpperCase(java.util.Locale.ROOT))
.checkValues(RapidsReaderType.values.map(_.toString))
.createWithDefault(RapidsReaderType.AUTO.toString)

val SEQUENCEFILE_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL =
conf("spark.rapids.sql.format.sequencefile.multiThreadedRead.maxNumFilesParallel")
.doc("A limit on the maximum number of files per task processed in parallel on the CPU " +
"side before the file is sent to the GPU. This affects the amount of host memory used " +
"when reading the files in parallel. Used with MULTITHREADED reader, see " +
s"$SEQUENCEFILE_READER_TYPE.")
.integerConf
.checkValue(v => v > 0, "The maximum number of files must be greater than 0.")
.createWithDefault(Integer.MAX_VALUE)

val ENABLE_DELTA_WRITE = conf("spark.rapids.sql.format.delta.write.enabled")
.doc("When set to false disables Delta Lake output acceleration.")
.booleanConf
Expand Down Expand Up @@ -3548,6 +3575,26 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val maxNumAvroFilesParallel: Int = get(AVRO_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL)

lazy val isSequenceFilePerFileReadEnabled: Boolean = {
val readerType = RapidsReaderType.withName(get(SEQUENCEFILE_READER_TYPE))
if (readerType == RapidsReaderType.COALESCING) {
throw new IllegalArgumentException(
s"COALESCING reader type is not supported for SequenceFile. " +
s"SequenceFile decoding happens on CPU, so coalescing provides no benefit. " +
s"Use PERFILE, MULTITHREADED, or AUTO instead.")
}
readerType == RapidsReaderType.PERFILE
}

lazy val isSequenceFileAutoReaderEnabled: Boolean =
RapidsReaderType.withName(get(SEQUENCEFILE_READER_TYPE)) == RapidsReaderType.AUTO

lazy val isSequenceFileMultiThreadReadEnabled: Boolean = isSequenceFileAutoReaderEnabled ||
RapidsReaderType.withName(get(SEQUENCEFILE_READER_TYPE)) == RapidsReaderType.MULTITHREADED

lazy val maxNumSequenceFilesParallel: Int = get(
SEQUENCEFILE_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL)

lazy val isDeltaWriteEnabled: Boolean = get(ENABLE_DELTA_WRITE)

lazy val isIcebergEnabled: Boolean = get(ENABLE_ICEBERG)
Expand Down
Loading
Loading