Skip to content

Commit 6ded134

Browse files
committed
#809 Add support for file end offset for compressed files.
1 parent 0dc8c2d commit 6ded134

File tree

5 files changed

+71
-24
lines changed

5 files changed

+71
-24
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class DefaultSource
6565
val hasCompressedFiles = filesList.exists(_.isCompressed)
6666

6767
if (hasCompressedFiles) {
68-
logger.info(s"Compressed files found. Binary parallelism and indexes won't be used for them.")
68+
logger.info(s"Compressed files found. Binary parallelism and indexes will be adjusted accordingly.")
6969
}
7070

7171
new CobolRelation(cobolParameters.sourcePaths,

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
3232
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
3333
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
3434
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
35-
import za.co.absa.cobrix.spark.cobol.utils.{HDFSUtils, SparkUtils}
35+
import za.co.absa.cobrix.spark.cobol.utils.{FileUtils, HDFSUtils, SparkUtils}
3636

3737
import java.util.concurrent.ConcurrentHashMap
3838
import scala.collection.mutable.ArrayBuffer
@@ -231,7 +231,13 @@ private[cobol] object IndexBuilder extends Logging {
231231
val maximumBytes = if (fileEndOffset == 0) {
232232
0
233233
} else {
234-
val bytesToRead = fileSystem.getContentSummary(path).getLength - fileEndOffset - startOffset
234+
val fileSize = if (FileUtils.isCompressed(path, config)) {
235+
FileUtils.getCompressedFileSize(path,config)
236+
} else {
237+
fileSystem.getFileStatus(path).getLen
238+
}
239+
240+
val bytesToRead = fileSize - fileEndOffset - startOffset
235241
if (bytesToRead < 0)
236242
0
237243
else

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package za.co.absa.cobrix.spark.cobol.source.scanners
1818

1919
import org.apache.hadoop.conf.Configuration
2020
import org.apache.hadoop.fs.Path
21-
import org.apache.hadoop.io.compress.CompressionCodecFactory
2221
import org.apache.hadoop.io.{LongWritable, Text}
2322
import org.apache.hadoop.mapred.TextInputFormat
2423
import org.apache.spark.rdd.RDD
@@ -73,17 +72,13 @@ private[source] object CobolScanners extends Logging {
7372
val maximumFileBytes = if (reader.getReaderProperties.fileEndOffset == 0) {
7473
0
7574
} else {
76-
if (isCompressed(path, sconf.value)) {
77-
// ToDo determine if the uncompressed file size can be effectively fetched
78-
if (reader.getReaderProperties.fileEndOffset > 0) {
79-
logger.warn(s"File end offset for $path is ignored because the file is compressed.")
80-
}
81-
0L
75+
val fileSize = if (FileUtils.isCompressed(path, sconf.value)) {
76+
FileUtils.getCompressedFileSize(path, sconf.value)
8277
} else {
83-
val fileSize = fileSystem.getFileStatus(path).getLen
84-
85-
fileSize - reader.getReaderProperties.fileEndOffset - startFileOffset
78+
fileSystem.getFileStatus(path).getLen
8679
}
80+
81+
fileSize - reader.getReaderProperties.fileEndOffset - startFileOffset
8782
}
8883
val dataStream = new FileStreamer(filePath, sconf.value, startFileOffset, maximumFileBytes)
8984
val headerStream = new FileStreamer(filePath, sconf.value, startFileOffset)
@@ -92,13 +87,6 @@ private[source] object CobolScanners extends Logging {
9287
})
9388
}
9489

95-
private[source] def isCompressed(file: Path, hadoopConfig: Configuration): Boolean = {
96-
val factory = new CompressionCodecFactory(hadoopConfig)
97-
val codec = factory.getCodec(file)
98-
99-
codec != null
100-
}
101-
10290
private[source] def buildScanForFixedLength(reader: FixedLenReader, sourceDirs: Seq[String],
10391
recordParser: (FixedLenReader, RDD[Array[Byte]]) => RDD[Row],
10492
debugIgnoreFileSize: Boolean,

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
package za.co.absa.cobrix.spark.cobol.utils
1818

19-
import java.io.{FileOutputStream, OutputStreamWriter, PrintWriter}
20-
import java.nio.charset.StandardCharsets
21-
import java.nio.file.{Files, Paths}
2219
import org.apache.hadoop.conf.Configuration
2320
import org.apache.hadoop.fs._
21+
import org.apache.hadoop.io.compress.CompressionCodecFactory
2422
import za.co.absa.cobrix.cobol.internal.Logging
2523

24+
import java.io.{FileOutputStream, IOException, OutputStreamWriter, PrintWriter}
25+
import java.nio.charset.StandardCharsets
26+
import java.nio.file.{Files, Paths}
2627
import scala.collection.JavaConverters._
2728

2829
/**
@@ -33,7 +34,6 @@ import scala.collection.JavaConverters._
3334
* Applies the same filter as Hadoop's FileInputFormat, which excludes files starting with '.' or '_'.
3435
*/
3536
object FileUtils extends Logging {
36-
3737
val THRESHOLD_DIR_LENGTH_FOR_SINGLE_FILE_CHECK = 50
3838

3939
private val hiddenFileFilter = new PathFilter() {
@@ -216,6 +216,42 @@ object FileUtils extends Logging {
216216
allNonDivisibleFiles.map(status => (status.getPath.toString, status.getLen))
217217
}
218218

219+
def isCompressed(file: Path, hadoopConfig: Configuration): Boolean = {
220+
val factory = new CompressionCodecFactory(hadoopConfig)
221+
val codec = factory.getCodec(file)
222+
223+
codec != null
224+
}
225+
226+
def getCompressedFileSize(file: Path, hadoopConfig: Configuration): Long = {
227+
logger.warn(s"Using full scan to determine file size of $file..")
228+
val factory = new CompressionCodecFactory(hadoopConfig)
229+
val codec = factory.getCodec(file)
230+
val fileSystem = file.getFileSystem(hadoopConfig)
231+
val fsIn: FSDataInputStream = fileSystem.open(file)
232+
val ifs = codec.createInputStream(fsIn)
233+
234+
val size = try {
235+
val SKIP_BUFFER_SIZE = 1024*1024*50
236+
var totalBytesSkipped = 0L
237+
var skippedLast = 1L
238+
while (skippedLast > 0) {
239+
skippedLast = ifs.skip(SKIP_BUFFER_SIZE)
240+
if (skippedLast > 0)
241+
totalBytesSkipped += skippedLast
242+
}
243+
totalBytesSkipped
244+
} catch {
245+
case e: IOException =>
246+
throw new IOException(s"Unable to determine compressed file size for $file", e)
247+
} finally {
248+
ifs.close()
249+
fsIn.close()
250+
}
251+
logger.info(s"The size of the uncompressed file $file is $size bytes.")
252+
size
253+
}
254+
219255
private def isNonDivisible(fileStatus: FileStatus, divisor: Long) = fileStatus.getLen % divisor != 0
220256

221257
/**

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,23 @@ class Test40CompressesFilesSpec extends AnyFunSuite with SparkTestBase with Bina
184184
assert(df.count == 300)
185185
}
186186

187+
test("read mixed compressed EBCDIC files and file_end_offset") {
188+
val inputDataPath = "../data/test40_data"
189+
190+
val df = spark
191+
.read
192+
.format("cobol")
193+
.option("copybook", inputCopybookPath)
194+
.option("schema_retention_policy", "collapse_root")
195+
.option("floating_point_format", "IEEE754")
196+
.option("strict_sign_overpunching", "true")
197+
.option("file_end_offset", 1493)
198+
.option("pedantic", "true")
199+
.load(inputDataPath)
200+
201+
assert(df.count == 297)
202+
}
203+
187204
test("read a compressed ASCII file 1") {
188205
testCompressedAsciiFile(Map(
189206
"record_format" -> "D"

0 commit comments

Comments
 (0)