Skip to content

Commit 2e10fbd

Browse files
committed
refactor
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
1 parent 562672a commit 2e10fbd

File tree

2 files changed

+169
-190
lines changed

2 files changed

+169
-190
lines changed

sql-plugin/src/main/scala/com/nvidia/spark/rapids/SequenceFileBinaryFileFormat.scala

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,36 @@ class SequenceFileBinaryFileFormat extends FileFormat with DataSourceRegister wi
9292
throw e
9393
}
9494

95-
// For the initial version, we explicitly fail fast on compressed SequenceFiles.
96-
// (Record- and block-compressed files can be added later.)
97-
if (reader.isCompressed || reader.isBlockCompressed) {
98-
val compressionType = reader.getCompressionType
99-
val msg = s"$SHORT_NAME does not support compressed SequenceFiles " +
100-
s"(compressionType=$compressionType), " +
101-
s"file=$path, keyClass=${reader.getKeyClassName}, " +
102-
s"valueClass=${reader.getValueClassName}"
103-
LoggerFactory.getLogger(classOf[SequenceFileBinaryFileFormat]).error(msg)
104-
reader.close()
105-
throw new UnsupportedOperationException(msg)
95+
// Register a task completion listener to ensure the reader is closed
96+
// even if the iterator is abandoned early or an exception occurs.
97+
Option(TaskContext.get()).foreach { tc =>
98+
tc.addTaskCompletionListener[Unit](_ => reader.close())
10699
}
107100

108101
val start = partFile.start
109-
val end = start + partFile.length
110-
if (start > 0) {
111-
reader.sync(start)
102+
try {
103+
// For the initial version, we explicitly fail fast on compressed SequenceFiles.
104+
// (Record- and block-compressed files can be added later.)
105+
if (reader.isCompressed || reader.isBlockCompressed) {
106+
val compressionType = reader.getCompressionType
107+
val msg = s"$SHORT_NAME does not support compressed SequenceFiles " +
108+
s"(compressionType=$compressionType), " +
109+
s"file=$path, keyClass=${reader.getKeyClassName}, " +
110+
s"valueClass=${reader.getValueClassName}"
111+
LoggerFactory.getLogger(classOf[SequenceFileBinaryFileFormat]).error(msg)
112+
throw new UnsupportedOperationException(msg)
113+
}
114+
115+
if (start > 0) {
116+
reader.sync(start)
117+
}
118+
} catch {
119+
case e: Throwable =>
120+
reader.close()
121+
throw e
112122
}
113123

124+
val end = start + partFile.length
114125
val reqFields = requiredSchema.fields
115126
val reqLen = reqFields.length
116127
val partLen = partitionSchema.length
@@ -128,12 +139,6 @@ class SequenceFileBinaryFileFormat extends FileFormat with DataSourceRegister wi
128139
val valueOut = new DataOutputBuffer()
129140
val valueDos = new DataOutputStream(valueOut)
130141

131-
// Register a task completion listener to ensure the reader is closed
132-
// even if the iterator is abandoned early or an exception occurs.
133-
Option(TaskContext.get()).foreach { tc =>
134-
tc.addTaskCompletionListener[Unit](_ => reader.close())
135-
}
136-
137142
new Iterator[InternalRow] {
138143
private[this] val unsafeProj = UnsafeProjection.create(outputSchema)
139144
private[this] var nextRow: InternalRow = _

0 commit comments

Comments
 (0)