Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
2ab5557
AI draft for protocol buffer support
thirtiseven Dec 19, 2025
d261358
AI draft for Hadoop sequence file reader
thirtiseven Dec 22, 2025
cd31fad
Revert "AI draft for protocol buffer support"
thirtiseven Dec 23, 2025
1278c66
clean up
thirtiseven Dec 23, 2025
10933b0
update tests
thirtiseven Dec 23, 2025
e965b01
address comment
thirtiseven Dec 25, 2025
f89f8c1
address comment
thirtiseven Dec 25, 2025
02c0752
address comment
thirtiseven Dec 25, 2025
f3bcf9d
address comment
thirtiseven Dec 25, 2025
562672a
copyrights
thirtiseven Jan 4, 2026
2e10fbd
refactor
thirtiseven Jan 5, 2026
572c0da
address comments
thirtiseven Jan 6, 2026
9b1162e
address comments
thirtiseven Jan 6, 2026
f95910f
address comments
thirtiseven Jan 6, 2026
481bfbe
multi-thread reader
thirtiseven Jan 7, 2026
bd526c5
delete perf test
thirtiseven Jan 7, 2026
cf33cf4
address commens
thirtiseven Jan 8, 2026
af43f3e
address comments
thirtiseven Jan 8, 2026
288152a
remove COALESCING reader
thirtiseven Jan 8, 2026
ea91eab
fix
thirtiseven Jan 8, 2026
6a23c2e
fix
thirtiseven Jan 8, 2026
9847f16
make sequence file isSplitable to false due to data diff
thirtiseven Jan 9, 2026
c6b98fb
Merge branch 'seq_file_reader' of https://github.com/thirtiseven/spar…
thirtiseven Jan 9, 2026
70ad202
fix merge seqreader
thirtiseven Jan 9, 2026
f9f4a8c
use gpu reader
thirtiseven Jan 13, 2026
e6322bc
fix a bug
thirtiseven Jan 13, 2026
94f31ea
performance optimization
thirtiseven Jan 16, 2026
4139c00
fix
thirtiseven Jan 16, 2026
1b2fbe9
Revert "fix"
thirtiseven Jan 20, 2026
310ccbc
Revert "performance optimization"
thirtiseven Jan 20, 2026
81ccdfa
Revert "fix a bug"
thirtiseven Jan 20, 2026
28d0405
Revert "use gpu reader"
thirtiseven Jan 20, 2026
dcf6af0
fix OOM bug
thirtiseven Jan 20, 2026
87f5a72
performance optimzation
thirtiseven Jan 20, 2026
bcfcbc7
integration tests
thirtiseven Jan 21, 2026
7cc02cf
splitable true by default
thirtiseven Jan 21, 2026
143fc3d
logical rule
thirtiseven Jan 23, 2026
dc0bbfc
save a memory copy
thirtiseven Jan 26, 2026
9619bc0
fix perfile config
thirtiseven Jan 27, 2026
d052441
GPU combine to produce larger batch
thirtiseven Jan 28, 2026
98ee00f
support glob style path
thirtiseven Feb 2, 2026
e4fef5a
a bug fix, RDD conversion refinement
thirtiseven Feb 3, 2026
0f8f8ca
support compress
thirtiseven Feb 4, 2026
c60c978
huge upmerge from dev branch
thirtiseven Feb 25, 2026
18015d6
refactor
thirtiseven Feb 26, 2026
1b297b9
Merge remote-tracking branch 'origin/main' into seq_file_reader
thirtiseven Mar 12, 2026
3fe2cd6
fix scala 2.13 build
thirtiseven Mar 12, 2026
ed7fa84
verify and refactor
thirtiseven Mar 12, 2026
05c42a6
address comments
thirtiseven Mar 13, 2026
9934152
style
thirtiseven Mar 13, 2026
4fab896
style refactor
thirtiseven Mar 13, 2026
a6120f4
verify
thirtiseven Mar 13, 2026
4c3bdf6
verify back 330
thirtiseven Mar 13, 2026
bdc343c
address commmit
thirtiseven Mar 14, 2026
d8de16a
address commmit
thirtiseven Mar 14, 2026
98fcaba
address commmit
thirtiseven Mar 14, 2026
f7cb695
address commmit
thirtiseven Mar 15, 2026
9288e9f
address comments
thirtiseven Mar 15, 2026
069d36c
address commmit
thirtiseven Mar 16, 2026
3c0f35d
address commmit
thirtiseven Mar 16, 2026
1b2621f
address commmit
thirtiseven Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import com.nvidia.spark.rapids.sequencefile.GpuSequenceFileMultiFilePartitionReaderFactory
import com.nvidia.spark.rapids.sequencefile.GpuSequenceFilePartitionReaderFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitionedFile}
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

/**
* A FileFormat that allows reading Hadoop SequenceFiles and returning raw key/value bytes as
* Spark SQL BinaryType columns.
*
* This is a GPU-enabled scan format in the sense that it returns GPU-backed ColumnarBatch output
* (the parsing itself is CPU-side IO + byte parsing).
*/
class GpuReadSequenceFileBinaryFormat extends FileFormat with GpuReadFileFormatWithMetrics {

override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = Some(SequenceFileBinaryFileFormat.dataSchema)

override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = true

override def buildReaderWithPartitionValuesAndMetrics(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration,
metrics: Map[String, GpuMetric]): PartitionedFile => Iterator[InternalRow] = {
val sqlConf = sparkSession.sessionState.conf
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val rapidsConf = new RapidsConf(sqlConf)

val factory = GpuSequenceFilePartitionReaderFactory(
sqlConf,
broadcastedHadoopConf,
requiredSchema,
partitionSchema,
rapidsConf,
metrics,
options)
PartitionReaderIterator.buildReader(factory)
}

// Default to multi-file reads (recommended for many small files).
override def isPerFileReadEnabled(conf: RapidsConf): Boolean = false

override def createMultiFileReaderFactory(
broadcastedConf: Broadcast[SerializableConfiguration],
pushedFilters: Array[Filter],
fileScan: GpuFileSourceScanExec): PartitionReaderFactory = {
GpuSequenceFileMultiFilePartitionReaderFactory(
fileScan.conf,
broadcastedConf,
fileScan.requiredSchema,
fileScan.readPartitionSchema,
fileScan.rapidsConf,
fileScan.allMetrics,
fileScan.queryUsesInputFile)
}
}

object GpuReadSequenceFileBinaryFormat {
def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val fsse = meta.wrapped
val required = fsse.requiredSchema
// Only support reading BinaryType columns named "key" and/or "value".
required.fields.foreach { f =>
val isKey = f.name.equalsIgnoreCase(SequenceFileBinaryFileFormat.KEY_FIELD)
val isValue = f.name.equalsIgnoreCase(SequenceFileBinaryFileFormat.VALUE_FIELD)
if ((isKey || isValue) && f.dataType != org.apache.spark.sql.types.BinaryType) {
meta.willNotWorkOnGpu(
s"SequenceFileBinary only supports BinaryType for " +
s"'${SequenceFileBinaryFileFormat.KEY_FIELD}' and " +
s"'${SequenceFileBinaryFileFormat.VALUE_FIELD}' columns, but saw " +
s"${f.name}: ${f.dataType.catalogString}")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.io.DataOutputStream
import java.net.URI
import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{DataOutputBuffer, SequenceFile}
import org.apache.hadoop.mapreduce.Job
import org.slf4j.LoggerFactory

import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

/**
* A Spark SQL file format that reads Hadoop SequenceFiles and returns raw bytes for key/value.
*
* The default inferred schema is:
* - key: BinaryType
* - value: BinaryType
*
* This format is intended to support protobuf payloads stored as raw bytes in the SequenceFile
* record value bytes. It currently only supports uncompressed SequenceFiles.
*
* Usage:
* {{{
* val df = spark.read
* .format("com.nvidia.spark.rapids.SequenceFileBinaryFileFormat")
* .load("path/to/sequencefiles")
* }}}
*/
class SequenceFileBinaryFileFormat extends FileFormat with DataSourceRegister with Serializable {
import SequenceFileBinaryFileFormat._

override def shortName(): String = SHORT_NAME

override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = Some(dataSchema)

override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = true

override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// Hadoop Configuration is not serializable; Spark will serialize the returned reader function.
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(partFile: PartitionedFile) => {
val filePathStr = partFile.filePath.toString
val path = new Path(new URI(filePathStr))
val conf = new Configuration(broadcastedHadoopConf.value.value)
val reader =
try {
new SequenceFile.Reader(conf, SequenceFile.Reader.file(path))
} catch {
case e: Exception =>
val msg = s"Failed to open SequenceFile reader for $path"
LoggerFactory.getLogger(classOf[SequenceFileBinaryFileFormat]).error(msg, e)
throw e
}

// For the initial version, we explicitly fail fast on compressed SequenceFiles.
// (Record- and block-compressed files can be added later.)
if (reader.isCompressed || reader.isBlockCompressed) {
val compressionType = reader.getCompressionType
val msg = s"$SHORT_NAME does not support compressed SequenceFiles " +
s"(compressionType=$compressionType), " +
s"file=$path, keyClass=${reader.getKeyClassName}, " +
s"valueClass=${reader.getValueClassName}"
LoggerFactory.getLogger(classOf[SequenceFileBinaryFileFormat]).error(msg)
reader.close()
throw new UnsupportedOperationException(msg)
}

val start = partFile.start
val end = start + partFile.length
if (start > 0) {
reader.sync(start)
}

val reqFields = requiredSchema.fields
val reqLen = reqFields.length
val partLen = partitionSchema.length
val totalLen = reqLen + partLen
val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)

val fieldInfos = reqFields.map { f =>
if (f.name.equalsIgnoreCase(KEY_FIELD)) 1
else if (f.name.equalsIgnoreCase(VALUE_FIELD)) 2
else 0
}

val keyBuf = new DataOutputBuffer()
val valueBytes = reader.createValueBytes()
val valueOut = new DataOutputBuffer()
val valueDos = new DataOutputStream(valueOut)

// Register a task completion listener to ensure the reader is closed
// even if the iterator is abandoned early or an exception occurs.
Option(TaskContext.get()).foreach { tc =>
tc.addTaskCompletionListener[Unit](_ => reader.close())
}

new Iterator[InternalRow] {
private[this] val unsafeProj = UnsafeProjection.create(outputSchema)
private[this] var nextRow: InternalRow = _
private[this] var prepared = false
private[this] var done = false

override def hasNext: Boolean = {
if (!prepared && !done) {
prepared = true
keyBuf.reset()
if (reader.getPosition < end && reader.nextRaw(keyBuf, valueBytes) >= 0) {
nextRow = buildRow()
} else {
done = true
close()
}
}
!done
}

override def next(): InternalRow = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
prepared = false
val ret = nextRow
nextRow = null
ret
}

private def buildRow(): InternalRow = {
val row = new GenericInternalRow(totalLen)
var valueCopied = false
var i = 0
while (i < reqLen) {
fieldInfos(i) match {
case 1 =>
val keyLen = keyBuf.getLength
row.update(i, util.Arrays.copyOf(keyBuf.getData, keyLen))
case 2 =>
if (!valueCopied) {
valueOut.reset()
valueBytes.writeUncompressedBytes(valueDos)
valueCopied = true
}
val valueLen = valueOut.getLength
row.update(i, util.Arrays.copyOf(valueOut.getData, valueLen))
case _ =>
row.setNullAt(i)
}
i += 1
}

// Append partition values (if any)
var p = 0
while (p < partLen) {
val dt = partitionSchema.fields(p).dataType
row.update(reqLen + p, partFile.partitionValues.get(p, dt))
p += 1
}
// Spark expects UnsafeRow for downstream serialization.
unsafeProj.apply(row).copy()
}

private def close(): Unit = {
reader.close()
}
}
}
}

override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
throw new UnsupportedOperationException(
s"${this.getClass.getCanonicalName} does not support writing")
}
}

object SequenceFileBinaryFileFormat {
final val SHORT_NAME: String = "sequencefilebinary"
final val KEY_FIELD: String = "key"
final val VALUE_FIELD: String = "value"

final val dataSchema: StructType = StructType(Seq(
StructField(KEY_FIELD, BinaryType, nullable = true),
StructField(VALUE_FIELD, BinaryType, nullable = true)
))
}
Loading