Skip to content

Commit 69bdafb

Browse files
committed
#809 Fix file end offset setting for compressed files.
1 parent 04260cb commit 69bdafb

File tree

2 files changed

+38
-4
lines changed

2 files changed

+38
-4
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package za.co.absa.cobrix.spark.cobol.source.scanners
1818

1919
import org.apache.hadoop.conf.Configuration
2020
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.io.compress.CompressionCodecFactory
2122
import org.apache.hadoop.io.{LongWritable, Text}
2223
import org.apache.hadoop.mapred.TextInputFormat
2324
import org.apache.spark.rdd.RDD
@@ -72,7 +73,17 @@ private[source] object CobolScanners extends Logging {
7273
val maximumFileBytes = if (reader.getReaderProperties.fileEndOffset == 0) {
7374
0
7475
} else {
75-
fileSystem.getFileStatus(path).getLen - reader.getReaderProperties.fileEndOffset - startFileOffset
76+
if (isCompressed(path, sconf.value)) {
77+
// ToDo determine if the uncompressed file size can be effectively fetched
78+
if (reader.getReaderProperties.fileEndOffset > 0) {
79+
logger.warn(s"File end offset for $path is ignored because the file is compressed.")
80+
}
81+
0L
82+
} else {
83+
val fileSize = fileSystem.getFileStatus(path).getLen
84+
85+
fileSize - reader.getReaderProperties.fileEndOffset - startFileOffset
86+
}
7687
}
7788
val dataStream = new FileStreamer(filePath, sconf.value, startFileOffset, maximumFileBytes)
7889
val headerStream = new FileStreamer(filePath, sconf.value, startFileOffset)
@@ -81,6 +92,13 @@ private[source] object CobolScanners extends Logging {
8192
})
8293
}
8394

95+
private[source] def isCompressed(file: Path, hadoopConfig: Configuration): Boolean = {
96+
val factory = new CompressionCodecFactory(hadoopConfig)
97+
val codec = factory.getCodec(file)
98+
99+
codec != null
100+
}
101+
84102
private[source] def buildScanForFixedLength(reader: FixedLenReader, sourceDirs: Seq[String],
85103
recordParser: (FixedLenReader, RDD[Array[Byte]]) => RDD[Row],
86104
debugIgnoreFileSize: Boolean,

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,10 @@ class BufferedFSDataInputStream(filePath: Path, hadoopConfig: Configuration, sta
8686
offsetLeft += lengthLeft
8787
lengthLeft = 0
8888
} else {
89-
if (bufferContainBytes > 0) {
90-
System.arraycopy(buffer, bufferPos, b, offsetLeft, lengthLeft)
89+
if (bufferContainBytes > 0 && lengthLeft > 0) {
90+
val available = bufferContainBytes - bufferPos
91+
val bytesToCopy = Math.min(lengthLeft, available)
92+
System.arraycopy(buffer, bufferPos, b, offsetLeft, bytesToCopy)
9193
bufferPos += bufferContainBytes
9294
offsetLeft += bufferContainBytes
9395
lengthLeft -= bufferContainBytes
@@ -128,12 +130,26 @@ class BufferedFSDataInputStream(filePath: Path, hadoopConfig: Configuration, sta
128130
val factory = new CompressionCodecFactory(hadoopConfig)
129131
val codec = factory.getCodec(filePath)
130132

131-
if (codec != null) {
133+
val baseStream = if (codec != null) {
132134
isCompressedStream = true
133135
codec.createInputStream(fsIn)
134136
} else {
135137
// No compression detected
136138
fsIn
137139
}
140+
141+
if (startOffset > 0) {
142+
if (codec == null) {
143+
baseStream.seek(startOffset)
144+
} else {
145+
var toSkip = startOffset
146+
while (toSkip > 0) {
147+
val skipped = baseStream.skip(toSkip)
148+
if (skipped <= 0) return baseStream
149+
toSkip -= skipped
150+
}
151+
}
152+
}
153+
baseStream
138154
}
139155
}

0 commit comments

Comments
 (0)