Skip to content

Support Hadoop SequenceFiles Scan#14061

Open
thirtiseven wants to merge 61 commits intoNVIDIA:mainfrom
thirtiseven:seq_file_reader
Open

Support Hadoop SequenceFiles Scan#14061
thirtiseven wants to merge 61 commits intoNVIDIA:mainfrom
thirtiseven:seq_file_reader

Conversation

@thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented Dec 23, 2025

Closes #14065

Description

This PR adds GPU-accelerated reading of Hadoop SequenceFiles in the RAPIDS Accelerator for Apache Spark. SequenceFile is commonly used as the storage format for serialized protobuf payloads (via from_protobuf). Previously, reading SequenceFiles was only possible through Spark's CPU-based RDD API (sc.newAPIHadoopFile), which becomes the I/O bottleneck when downstream decode (e.g., protobuf) runs on the GPU.

The implementation introduces a physical plan replacement strategy: when Spark's plan contains a SerializeFromObjectExec over an ExternalRDDScanExec backed by a simple SequenceFile RDD, the plugin replaces it with GpuSequenceFileSerializeFromObjectExec, which bypasses the original RDD and reads files directly using a multi-threaded reader with combine mode.

Key Design Decisions

  • CPU decode, GPU transfer: SequenceFile decoding uses Hadoop's SequenceFile.Reader on the CPU (the format is not amenable to GPU parsing). The decoded binary payloads are buffered in pinned host memory and transferred to the GPU as LIST<UINT8> columns. This means COALESCING and PERFILE reader modes are not supported (no benefit), and MULTITHREADED is the default.
  • RDD lineage reflection: The meta layer uses reflection on NewHadoopRDD / HadoopRDD to confirm that the RDD lineage is a simple SequenceFile scan with BytesWritable key/value. Complex lineages (e.g., filtered/mapped RDDs) automatically fall back to CPU.
  • Compression: Compressed SequenceFiles (record or block compression) are detected at planning time and fall back to CPU. Only uncompressed SequenceFiles are GPU-accelerated.
  • Combine mode: Small files are merged into larger batches via zero-copy GPU concatenation, improving throughput when reading many small SequenceFiles.
  • Old and New Hadoop API: Both sc.newAPIHadoopFile() and sc.hadoopFile() paths are supported.
  • Glob patterns: Input paths support Hadoop-style glob patterns (e.g., data/year=2024/*).
  • Error handling: Respects spark.sql.files.ignoreMissingFiles and spark.sql.files.ignoreCorruptFiles configurations.

New Files

File Description
GpuSequenceFileSerializeFromObjectExecMeta.scala Planning meta: detects SequenceFile RDD lineage via reflection, validates schema/compression, collects input paths
GpuSequenceFileSerializeFromObjectExec.scala GPU exec node: file listing, partition creation, multi-threaded reader orchestration, CPU fallback path
sequencefile/GpuSequenceFileReaders.scala Core reader: HostBinaryListBufferer, SequenceFileChunk, MultiFileCloudSequenceFilePartitionReader, factory

New Configs

Config Default Description
spark.rapids.sql.format.sequencefile.reader.type MULTITHREADED Reader type (only MULTITHREADED and AUTO supported)
spark.rapids.sql.format.sequencefile.multiThreadedRead.maxNumFilesParallel Integer.MAX_VALUE Max files read in parallel per task
spark.rapids.sql.format.sequencefile.rddScan.physicalReplace.enabled true Enable/disable physical plan replacement

Performance tests

val NUM_FILES = 200
val RECORDS_PER_FILE = 50000
val VALUE_SIZE = 1024
val ITERATIONS = 5

I ran performance tests on 200 files with 50,000 records and a 1 MB size per value.

script

Since the decode happens on CPU we got similar perf numbers with CPU file format and we need to copy data to GPU for GPU file format. But it's about 2 times faster than CPU RDD scan.

CPU value-only            |          1.24 s |           2.00x
GPU MT-8 threads          |          1.25 s |           1.98x
GPU MT-4 threads          |          1.27 s |           1.95x
GPU MT-2 threads          |          1.28 s |           1.94x
GPU MULTITHREADED         |          1.28 s |           1.94x
CPU FileFormat            |          1.35 s |           1.84x
GPU AUTO                  |          1.37 s |           1.82x
GPU PERFILE               |          1.37 s |           1.81x
GPU value-only            |          1.38 s |           1.80x
GPU COALESCING            |          1.41 s |           1.76x
RDD Scan                  |          2.49 s |        baseline

Test Coverage

Scala unit tests (SequenceFilePhysicalReplaceSuite, SequenceFileBinaryFileFormatSuite):

  • Physical plan replacement for uncompressed SequenceFile
  • CPU fallback for compressed input, complex RDD lineage, unsupported reader types
  • Glob patterns (simple and recursive)
  • Old Hadoop API (hadoopRDD) support
  • ignoreMissingFiles behavior
  • Corrupt file handling
  • Column name / schema semantics

Python integration tests (sequencefile_test.py):

  • Basic key/value read (GPU vs CPU correctness)
  • Value-only read (protobuf use case)
  • Empty files, large batches, large records
  • Combine mode correctness with many small files
  • Binary data patterns
  • Missing/corrupt file handling
  • Config-disabled fallback path

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for reading Hadoop SequenceFiles in the RAPIDS Accelerator for Apache Spark. It registers a new file format sequencefilebinary that reads SequenceFile key/value pairs as raw BinaryType columns on the GPU.

Key Changes

  • Introduces SequenceFileBinaryFileFormat as a new DataSource that reads SequenceFiles and exposes key/value as BinaryType columns
  • Implements GPU-accelerated reading via GpuReadSequenceFileBinaryFormat and associated partition readers
  • Integrates the new format into GpuFileSourceScanExec for GPU execution path routing

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
sql-plugin/src/main/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormat.scala CPU-side FileFormat implementation with row-based reader for SequenceFiles
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuReadSequenceFileBinaryFormat.scala GPU-enabled FileFormat wrapper with metadata support and multi-file reader factory
sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala Core GPU partition readers with host-side buffering and device column materialization
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala Integration point registering SequenceFileBinary format in GPU scan execution
tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala Test suite in tests module for wildcard discovery
sql-plugin/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala Duplicate test suite in sql-plugin module

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@Greptile full review

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 23, 2025

Greptile Summary

This PR introduces GPU-accelerated reading of Hadoop SequenceFiles by replacing SerializeFromObjectExec over ExternalRDDScanExec with a new GpuSequenceFileSerializeFromObjectExec that bypasses the original RDD and reads files directly using a multi-threaded reader with combine mode. SequenceFile decoding remains on the CPU (Hadoop's SequenceFile.Reader), while the decoded binary payloads are transferred to the GPU as LIST<UINT8> columns, achieving ~2× throughput improvement over the baseline CPU RDD path according to the benchmark data.

Key issues found:

  • Compression sampling is insufficient (GpuSequenceFileSerializeFromObjectExecMeta.hasCompressedInput): Only the first file per input path is sampled at planning time. In a directory with mixed compressed/uncompressed files the planner approves GPU execution, but at task time UnsupportedSequenceFileCompressionException is thrown and explicitly re-thrown (not caught by ignoreCorruptFiles), causing unrecoverable task failures rather than a graceful per-file CPU fallback.
  • ignoreMissingFiles is silently ignored for glob patterns (GpuSequenceFileSerializeFromObjectExec.resolveInputStatuses): When a glob matches zero files the code always throws noMatchesError regardless of ignoreMissingFiles=true. Only non-glob missing paths respect this flag.
  • Resource leak on error in doCombineHmbs (MultiFileCloudSequenceFilePartitionReader): Once allKeyChunks/allValueChunks are extracted from the original holders (which are intentionally left unclosed), there is no closeOnExcept guard before the combined SequenceFileHostBuffersWithMetaData is constructed. An exception at that point leaks the extracted host memory chunks.
  • Global SerializeFromObjectExec registration (GpuOverrides): Registering this exec globally causes all non-SequenceFile uses to produce "will not work on GPU" messages in plan output, which can be confusing for users.
  • The GpuMultiFileReader changes that add exception-safe safeClose guards in getNextBuffersAndMeta are a clean improvement with no issues found.

Confidence Score: 2/5

  • Not safe to merge — the compression sampling gap can cause unrecoverable task failures on mixed-compression datasets, and the glob+ignoreMissingFiles logic is incorrect.
  • Two logic bugs block safe merging: (1) planning-time compression detection samples only one file per path, so mixed compressed/uncompressed directories pass planning but fail at task execution with an uncaught exception; (2) ignoreMissingFiles is not respected when a glob pattern resolves to zero files. Additionally there is a host memory leak risk in the combine path. The overall architecture, performance design, and test coverage are solid, but these correctness issues need to be resolved first.
  • Pay close attention to GpuSequenceFileSerializeFromObjectExecMeta.scala (compression sampling), GpuSequenceFileSerializeFromObjectExec.scala (glob+ignoreMissingFiles), and GpuSequenceFileReaders.scala (doCombineHmbs resource safety).

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala Core reader implementation: HostBinaryListBufferer, SequenceFileChunk, MultiFileCloudSequenceFilePartitionReader — resource leak risk in doCombineHmbs when building the combined result from detached chunks
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSequenceFileSerializeFromObjectExecMeta.scala Planning meta with reflection-based RDD lineage detection and compression sampling — sampling only the first file per input path can miss compressed files in mixed datasets, causing unhandled runtime failures
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuSequenceFileSerializeFromObjectExec.scala GPU exec node with file listing and multi-threaded reader orchestration — resolveInputStatuses does not respect ignoreMissingFiles for glob patterns that resolve to zero files
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala Registers SerializeFromObjectExec globally for GPU replacement — correct but causes verbose "will not work on GPU" messages for all non-SequenceFile uses of this exec
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala Adds exception-safe cleanup for async reader buffers in getNextBuffersAndMeta and close() paths — solid defensive improvement with no issues found
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala Adds three new SequenceFile configs (reader type, max parallel files, physical replace toggle); PERFILE/COALESCING types throw from a lazy val — functional but slightly unusual pattern for config validation
tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala Comprehensive unit tests covering compression fallback, glob patterns, old API, column name semantics — no test for glob+ignoreMissingFiles combination
tests/src/test/scala/com/nvidia/spark/rapids/SequenceFilePhysicalReplaceSuite.scala Tests physical plan replacement for uncompressed files and ignoreMissingFiles behavior; test for ignoreMissingFiles uses only explicit paths, not glob patterns
integration_tests/src/main/python/sequencefile_test.py Integration tests for key/value reads, value-only (protobuf) pattern, combine mode, binary patterns, and error handling — good coverage for happy-path and edge cases

Sequence Diagram

sequenceDiagram
    participant Planner as Spark Planner
    participant Meta as GpuSequenceFileSerializeFromObjectExecMeta
    participant Exec as GpuSequenceFileSerializeFromObjectExec
    participant Factory as GpuSequenceFileMultiFilePartitionReaderFactory
    participant Reader as MultiFileCloudSequenceFilePartitionReader
    participant HadoopFS as Hadoop FileSystem
    participant GPU as GPU (cuDF)

    Planner->>Meta: tagPlanForGpu()
    Meta->>Meta: isSimpleSequenceFileRDD() via reflection
    Meta->>HadoopFS: findAnyFile() — sample first file per path
    HadoopFS-->>Meta: first file
    Meta->>HadoopFS: isCompressedSequenceFile(firstFile)
    HadoopFS-->>Meta: compressed? → fall back to CPU
    Meta->>Meta: collectInputPaths() via reflection
    Meta-->>Planner: convertToGpu() → GpuSequenceFileSerializeFromObjectExec

    Planner->>Exec: internalDoExecuteColumnar()
    Exec->>HadoopFS: filePartitions — glob + list all files
    HadoopFS-->>Exec: FilePartition[]
    Exec->>Factory: create GpuSequenceFileMultiFilePartitionReaderFactory
    Factory->>Reader: buildBaseColumnarReaderForCloud(files)

    loop Per file (multithreaded)
        Reader->>HadoopFS: SequenceFile.Reader.nextRaw()
        HadoopFS-->>Reader: key/value bytes
        Reader->>Reader: HostBinaryListBufferer.addBytes()
        Reader->>Reader: SequenceFileChunk (host pinned memory)
    end

    Reader->>Reader: combineHMBs() — zero-copy chunk collection
    Reader->>GPU: buildDeviceColumnFromChunks() — H2D transfer
    GPU->>GPU: ColumnVector.concatenate() (combine mode)
    GPU-->>Exec: ColumnarBatch (LIST<UINT8>)
Loading

Comments Outside Diff (4)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSequenceFileSerializeFromObjectExecMeta.scala, line 270-280 (link)

    Compression sampling too conservative — mixed datasets silently fail at runtime

    hasCompressedInput calls findAnyFile, which returns only the first file found in each input path. If a directory contains a mix of compressed and uncompressed SequenceFiles, the planner may sample an uncompressed file, approve GPU execution, and then encounter a compressed file at task time.

    In ReadBatchRunner.callImpl, UnsupportedSequenceFileCompressionException is explicitly re-thrown (not handled by ignoreCorruptFiles), so the task will fail with an unhandled exception rather than falling back to CPU gracefully.

    case e: UnsupportedSequenceFileCompressionException => throw e

    To be safe, hasCompressedInput should scan all files (or at least a broader sample) in each input path, not just the first file. Alternatively, the reader should tolerate a per-file CPU fallback when compression is detected at execution time rather than propagating an unrecoverable failure.

  2. sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuSequenceFileSerializeFromObjectExec.scala, line 220-232 (link)

    ignoreMissingFiles is not respected for glob patterns

    When statuses is null/empty and the path looks like a glob, the code always throws noMatchesError, regardless of the ignoreMissingFiles flag. The flag is only consulted for non-glob paths.

    This means a user with spark.sql.files.ignoreMissingFiles=true and a glob path that resolves to zero files will receive an unexpected InvalidInputException instead of a quietly empty result. The test in SequenceFilePhysicalReplaceSuite only exercises a non-glob path for this case, so the glob+ignoreMissingFiles combination is not covered.

    if (statuses == null || statuses.isEmpty) {
      val pathStr = path.toString
      val looksLikeGlob = pathStr.exists(ch => ch == '*' || ch == '?' || ch == '[' || ch == '{')
      if (looksLikeGlob) {
        if (ignoreMissingFiles) {
          Array.empty[FileStatus]  // respect the flag for globs too
        } else {
          throw noMatchesError(path)
        }
      } else if (ignoreMissingFiles) {
        Array.empty[FileStatus]
      } else {
        throw new FileNotFoundException(s"Input path does not exist: $path")
      }
    }
  3. sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala, line 436-465 (link)

    Potential resource leak in doCombineHmbs — chunks not protected on error

    After allKeyChunks and allValueChunks are extracted from the toCombine holders via flatMap, the chunk objects are "detached" from their original parents (the parents are intentionally left unclosed). If any subsequent operation (e.g., creating SequenceFileHostBuffersWithMetaData, computing totalRows, etc.) throws before the combined result takes ownership of those chunks, both allKeyChunks and allValueChunks will be leaked because neither the unclosed originals nor the not-yet-created combined result will close them.

    A closeOnExcept guard should be wrapped around the construction of the combined result:

    val allKeyChunks = toCombine.flatMap(_.keyChunks)
    closeOnExcept(allKeyChunks) { _ =>
      val allValueChunks = toCombine.flatMap(_.valueChunks)
      closeOnExcept(allValueChunks) { _ =>
        val result = SequenceFileHostBuffersWithMetaData(...)
        toCombine.foreach(_.combineReleaseCallbacks(result))
        result
      }
    }
  4. sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala, line 4668-4671 (link)

    All SerializeFromObjectExec nodes will generate "will not work on GPU" messages

    Registering SerializeFromObjectExec globally means every use of this exec node — including those generated by Spark's Dataset encoder for completely unrelated operations — will be inspected by GpuSequenceFileSerializeFromObjectExecMeta.tagPlanForGpu(). For non-SequenceFile plans this always ends with a willNotWorkOnGpu(...) call (e.g., "SerializeFromObject child is not ExternalRDDScanExec"), which shows up in plan explanations and logs.

    Consider adding a fast-path check at the top of tagPlanForGpu() that silently tags the node as non-replaceable if the child is not an ExternalRDDScanExec, so that irrelevant SerializeFromObjectExec nodes don't generate noisy messages:

    override def tagPlanForGpu(): Unit = {
      wrapped.child match {
        case _: ExternalRDDScanExec[_] => // proceed with full checks
        case _ =>
          willNotWorkOnGpu("Not a SequenceFile RDD scan")
          return
      }
      ...
    }

Last reviewed commit: 1b2621f

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (4)

  1. sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala, line 125-128 (link)

    logic: check for potential INT32 overflow before it happens

    the check happens after dataLocation has already grown beyond Int.MaxValue, which could cause issues during the buffer growth operations in addBytes or addValueBytes. move the overflow check earlier in those methods before updating dataLocation.

  2. sql-plugin/src/main/scala/com/nvidia/spark/rapids/sequencefile/GpuSequenceFileReaders.scala, line 73-83 (link)

    logic: potential integer overflow in row capacity calculation

    rowsAllocated * 2 can overflow when rowsAllocated is close to Int.MaxValue / 2. this causes the allocation to wrap to negative or small values.

  3. tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala, line 32-136 (link)

    style: missing test coverage for key scenarios

    add tests for:

    • compressed SequenceFiles (should throw UnsupportedOperationException)
    • multi-file reads to verify the multi-file reader path
    • large batches that exceed maxRowsPerBatch or maxBytesPerBatch
    • partition columns
    • reading only key or only value (not both)
    • empty files

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

  4. tests/src/test/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormatSuite.scala, line 102-136 (link)

    style: tests only verify the CPU reader path, not GPU

    this test uses SequenceFileBinaryFileFormat which is the CPU fallback. to test the GPU path with GpuReadSequenceFileBinaryFormat, you'd need to enable the Rapids plugin configuration and verify GPU execution.

5 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

@sameerz sameerz requested a review from mythrocks December 23, 2025 21:57
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 12 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 13 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptileai full review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

1 similar comment
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

@greptile review

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven thirtiseven self-assigned this Mar 16, 2026
@thirtiseven thirtiseven marked this pull request as ready for review March 16, 2026 07:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Support reading Hadoop SequenceFiles on GPU

4 participants