Skip to content

Commit 92c67eb

Browse files
committed
#809 Add support for reading compressed EBCDIC files.
1 parent 7cbf3af commit 92c67eb

File tree

14 files changed

+138
-80
lines changed

14 files changed

+138
-80
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ trait SimpleStream {
2929

3030
def isEndOfStream: Boolean = offset >= size
3131

32+
def isCompressed: Boolean = false
33+
3234
@throws(classOf[Exception])
3335
def copyStream(): SimpleStream
3436

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ object SparkCobolProcessor {
229229
val numOfBytesMsg = if (numOfBytes > 0) s"${numOfBytes / Constants.megabyte} MB" else "until the end"
230230

231231
log.info(s"Going to process offsets ${indexEntry.offsetFrom}...${indexEntry.offsetTo} ($numOfBytesMsg) of $fileName")
232-
val dataStream = new FileStreamer(filePathName, fileSystem, indexEntry.offsetFrom, numOfBytes)
233-
val headerStream = new FileStreamer(filePathName, fileSystem)
232+
val dataStream = new FileStreamer(filePathName, sconf.value, indexEntry.offsetFrom, numOfBytes)
233+
val headerStream = new FileStreamer(filePathName, sconf.value)
234234

235235
CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, dataStream, Some(headerStream))
236236
})
@@ -240,7 +240,7 @@ object SparkCobolProcessor {
240240
val hadoopConfig = sconf.value
241241
log.info(s"Going to process data from $inputFile")
242242
val inputFs = new Path(inputFile).getFileSystem(hadoopConfig)
243-
val ifs = new FileStreamer(inputFile, inputFs)
243+
val ifs = new FileStreamer(inputFile, sconf.value)
244244

245245
CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, ifs, None)
246246
}
@@ -266,7 +266,7 @@ object SparkCobolProcessor {
266266
Future {
267267
val hadoopConfig = sconf.value
268268
val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig)
269-
val ifs = new FileStreamer(inputFIle, inputFs)
269+
val ifs = new FileStreamer(inputFIle, sconf.value)
270270
val outputFile = new Path(outputPath, fileName)
271271
val outputFs = outputFile.getFileSystem(hadoopConfig)
272272
val ofs = new BufferedOutputStream(outputFs.create(outputFile, true))

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@
1616

1717
package za.co.absa.cobrix.spark.cobol.source
1818

19-
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
20-
2119
import org.apache.hadoop.conf.Configuration
22-
import org.apache.hadoop.mapred.FileInputFormat
20+
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.io.compress.CompressionCodecFactory
2322
import org.apache.spark.rdd.RDD
2423
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
2524
import org.apache.spark.sql.types._
2625
import org.apache.spark.sql.{Row, SQLContext}
27-
import za.co.absa.cobrix.spark.cobol.reader.{FixedLenReader, FixedLenTextReader, Reader, VarLenReader}
2826
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
27+
import za.co.absa.cobrix.spark.cobol.reader.{FixedLenReader, FixedLenTextReader, Reader, VarLenReader}
2928
import za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder
3029
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
3130
import za.co.absa.cobrix.spark.cobol.source.scanners.CobolScanners
3231
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
3332
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
3433

34+
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
3535
import scala.util.control.NonFatal
3636

3737

@@ -63,6 +63,7 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
6363
* Its constructor is expected to change after the hierarchy of [[za.co.absa.cobrix.spark.cobol.reader.Reader]] is put in place.
6464
*/
6565
class CobolRelation(sourceDirs: Seq[String],
66+
filesList: Array[FileWithOrder],
6667
cobolReader: Reader,
6768
localityParams: LocalityParameters,
6869
debugIgnoreFileSize: Boolean)
@@ -71,8 +72,6 @@ class CobolRelation(sourceDirs: Seq[String],
7172
with Serializable
7273
with TableScan {
7374

74-
private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval)
75-
7675
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, cobolReader.getReaderProperties.isIndexCachingAllowed)(localityParams)
7776

7877
override def schema: StructType = {
@@ -94,15 +93,7 @@ class CobolRelation(sourceDirs: Seq[String],
9493
}
9594
}
9695

97-
/**
98-
* Checks if the recursive file retrieval flag is set
99-
*/
100-
private def isRecursiveRetrieval: Boolean = {
101-
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
102-
hadoopConf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
103-
}
104-
105-
private[source] def parseRecords(reader: FixedLenReader, records: RDD[Array[Byte]]) = {
96+
private[source] def parseRecords(reader: FixedLenReader, records: RDD[Array[Byte]]): RDD[Row] = {
10697
records.flatMap(record => {
10798
val it = reader.getRowIterator(record)
10899
for (parsedRecord <- it) yield {
@@ -125,8 +116,16 @@ object CobolRelation {
125116
.getFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, isRecursiveRetrieval)
126117
}).toArray
127118

119+
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
120+
val factory = new CompressionCodecFactory(hadoopConf)
121+
128122
allFiles
129123
.zipWithIndex
130-
.map(file => FileWithOrder(file._1, file._2))
124+
.map { case (fileName, order) =>
125+
val codec = factory.getCodec(new Path(fileName))
126+
val isCompressed = codec != null
127+
128+
FileWithOrder(fileName, order, isCompressed)
129+
}
131130
}
132131
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package za.co.absa.cobrix.spark.cobol.source
1818

1919
import org.apache.hadoop.fs.Path
2020
import org.apache.hadoop.io.{BytesWritable, NullWritable}
21+
import org.apache.hadoop.mapred.FileInputFormat
2122
import org.apache.spark.sql.sources._
2223
import org.apache.spark.sql.types.StructType
2324
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
@@ -59,8 +60,17 @@ class DefaultSource
5960
val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
6061
CobolParametersValidator.checkSanity(cobolParameters)
6162

63+
val filesList = CobolRelation.getListFilesWithOrder(cobolParameters.sourcePaths, sqlContext, isRecursiveRetrieval(sqlContext))
64+
65+
val hasCompressedFiles = filesList.exists(_.isCompressed)
66+
67+
if (hasCompressedFiles) {
68+
logger.info(s"Compressed files found. Binary parallelism and indexes won't be used for them.")
69+
}
70+
6271
new CobolRelation(cobolParameters.sourcePaths,
63-
buildEitherReader(sqlContext.sparkSession, cobolParameters),
72+
filesList,
73+
buildEitherReader(sqlContext.sparkSession, cobolParameters, hasCompressedFiles),
6474
LocalityParameters.extract(cobolParameters),
6575
cobolParameters.debugIgnoreFileSize)(sqlContext)
6676
}
@@ -126,6 +136,14 @@ class DefaultSource
126136

127137
//TODO fix with the correct implementation once the correct Reader hierarchy is put in place.
128138
override def buildReader(spark: SparkSession, parameters: Map[String, String]): FixedLenReader = null
139+
140+
/**
141+
* Checks if the recursive file retrieval flag is set
142+
*/
143+
private def isRecursiveRetrieval(sqlContext: SQLContext): Boolean = {
144+
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
145+
hadoopConf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
146+
}
129147
}
130148

131149
object DefaultSource {
@@ -136,10 +154,10 @@ object DefaultSource {
136154
*
137155
* This method will probably be removed once the correct hierarchy for [[FixedLenReader]] is put in place.
138156
*/
139-
def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters): Reader = {
157+
def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters, hasCompressedFiles: Boolean): Reader = {
140158
val reader = if (cobolParameters.isText && cobolParameters.variableLengthParams.isEmpty) {
141159
createTextReader(cobolParameters, spark)
142-
} else if (cobolParameters.variableLengthParams.isEmpty) {
160+
} else if (cobolParameters.variableLengthParams.isEmpty && !hasCompressedFiles) {
143161
createFixedLengthReader(cobolParameters, spark)
144162
}
145163
else {

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,12 @@ private[cobol] object IndexBuilder extends Logging {
205205

206206
val (inputStream, headerStream, maximumBytes) = getStreams(filePath, startOffset, endOffset, config)
207207
val index = try {
208-
reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian)
208+
if (inputStream.isCompressed) {
209+
val element = SparseIndexEntry(0, -1, fileOrder, 0L)
210+
ArrayBuffer[SparseIndexEntry](element)
211+
} else {
212+
reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian)
213+
}
209214
} finally {
210215
inputStream.close()
211216
headerStream.close()
@@ -238,8 +243,8 @@ private[cobol] object IndexBuilder extends Logging {
238243
bytesToRead
239244
}
240245

241-
val inputStream = new FileStreamer(filePath, fileSystem, startOffset, maximumBytes)
242-
val headerStream = new FileStreamer(filePath, fileSystem)
246+
val inputStream = new FileStreamer(filePath, config, startOffset, maximumBytes)
247+
val headerStream = new FileStreamer(filePath, config)
243248

244249
(inputStream, headerStream, maximumBytes)
245250
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,13 @@ private[source] object CobolScanners extends Logging {
4343
indexes.flatMap(indexEntry => {
4444
val filePathName = filesMap(indexEntry.fileId)
4545
val path = new Path(filePathName)
46-
val fileSystem = path.getFileSystem(sconf.value)
4746
val fileName = path.getName
4847
val numOfBytes = if (indexEntry.offsetTo > 0L) indexEntry.offsetTo - indexEntry.offsetFrom else 0L
4948
val numOfBytesMsg = if (numOfBytes > 0) s"${numOfBytes / Constants.megabyte} MB" else "until the end"
5049

5150
logger.info(s"Going to process offsets ${indexEntry.offsetFrom}...${indexEntry.offsetTo} ($numOfBytesMsg) of $fileName")
52-
val dataStream = new FileStreamer(filePathName, fileSystem, indexEntry.offsetFrom, numOfBytes)
53-
val headerStream = new FileStreamer(filePathName, fileSystem)
51+
val dataStream = new FileStreamer(filePathName, sconf.value, indexEntry.offsetFrom, numOfBytes)
52+
val headerStream = new FileStreamer(filePathName, sconf.value)
5453
reader.getRowIterator(dataStream, headerStream, indexEntry.offsetFrom, indexEntry.fileId, indexEntry.recordIndex)
5554
})
5655
}
@@ -75,8 +74,8 @@ private[source] object CobolScanners extends Logging {
7574
} else {
7675
fileSystem.getFileStatus(path).getLen - reader.getReaderProperties.fileEndOffset - startFileOffset
7776
}
78-
val dataStream = new FileStreamer(filePath, fileSystem, startFileOffset, maximumFileBytes)
79-
val headerStream = new FileStreamer(filePath, fileSystem, startFileOffset)
77+
val dataStream = new FileStreamer(filePath, sconf.value, startFileOffset, maximumFileBytes)
78+
val headerStream = new FileStreamer(filePath, sconf.value, startFileOffset)
8079
reader.getRowIterator(dataStream, headerStream, startFileOffset, fileOrder, 0L)
8180
})
8281
})

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@
1616

1717
package za.co.absa.cobrix.spark.cobol.source.streaming
1818

19-
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
19+
import org.apache.hadoop.conf.Configuration
20+
import org.apache.hadoop.fs.{FSDataInputStream, Path}
21+
import org.apache.hadoop.io.compress.CompressionCodecFactory
2022

21-
import java.io.IOException
23+
import java.io.{IOException, InputStream}
2224

23-
class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOffset: Long, bufferSizeInMegabytes: Int, maximumBytes: Long ) {
25+
class BufferedFSDataInputStream(filePath: Path, hadoopConfig: Configuration, startOffset: Long, bufferSizeInMegabytes: Int, maximumBytes: Long ) {
2426
val bytesInMegabyte: Int = 1048576
27+
private var isCompressedStream = false
2528

2629
if (bufferSizeInMegabytes <=0 || bufferSizeInMegabytes > 1000) {
2730
throw new IllegalArgumentException(s"Invalid buffer size $bufferSizeInMegabytes MB.")
2831
}
2932

30-
var in: FSDataInputStream = fileSystem.open(filePath)
31-
if (startOffset > 0) {
32-
in.seek(startOffset)
33-
}
33+
private var in: InputStream = openStream()
3434

3535
private val bufferSizeInBytes = bufferSizeInMegabytes * bytesInMegabyte
3636
private var isStreamClosed = in == null
@@ -51,6 +51,8 @@ class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOff
5151

5252
def isClosed: Boolean = isStreamClosed && bufferPos >= bufferConitainBytes
5353

54+
def isCompressed: Boolean = isCompressedStream
55+
5456
def readFully(b: Array[Byte], off: Int, len: Int): Int =
5557
{
5658
if (isClosed) {
@@ -115,4 +117,23 @@ class BufferedFSDataInputStream(filePath: Path, fileSystem: FileSystem, startOff
115117
}
116118
}
117119

120+
private def openStream(): InputStream = {
121+
val fileSystem = filePath.getFileSystem(hadoopConfig)
122+
val fsIn: FSDataInputStream = fileSystem.open(filePath)
123+
124+
if (startOffset > 0) {
125+
fsIn.seek(startOffset)
126+
}
127+
128+
val factory = new CompressionCodecFactory(hadoopConfig)
129+
val codec = factory.getCodec(filePath)
130+
131+
if (codec != null) {
132+
isCompressedStream = true
133+
codec.createInputStream(fsIn)
134+
} else {
135+
// No compression detected
136+
fsIn
137+
}
138+
}
118139
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package za.co.absa.cobrix.spark.cobol.source.streaming
1818

19-
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
19+
import org.apache.hadoop.conf.Configuration
20+
import org.apache.hadoop.fs.{ContentSummary, Path}
2021
import org.apache.log4j.Logger
21-
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
22-
import org.apache.hadoop.fs.ContentSummary
2322
import za.co.absa.cobrix.cobol.reader.common.Constants
23+
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
2424

2525
import java.io.IOException
2626

@@ -33,10 +33,10 @@ import java.io.IOException
3333
* file be consumed.
3434
*
3535
* @param filePath String containing the fully qualified path to the file.
36-
* @param fileSystem Underlying Hadoop file system.
36+
* @param hadoopConfig Hadoop configuration.
3737
* @note This class is not thread-safe and should only be accessed from a single thread
3838
*/
39-
class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream {
39+
class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream {
4040

4141
private val logger = Logger.getLogger(FileStreamer.this.getClass)
4242

@@ -59,6 +59,11 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
5959

6060
override def offset: Long = byteIndex
6161

62+
override def isCompressed: Boolean = {
63+
ensureOpened()
64+
bufferedStream.isCompressed
65+
}
66+
6267
/**
6368
* Retrieves a given number of bytes from the file stream.
6469
*
@@ -123,18 +128,19 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
123128
}
124129

125130
override def copyStream(): SimpleStream = {
126-
new FileStreamer(filePath, fileSystem, startOffset, maximumBytes)
131+
new FileStreamer(filePath, hadoopConfig, startOffset, maximumBytes)
127132
}
128133

129134
@throws[IOException]
130135
private def ensureOpened(): Unit = {
131136
if (!wasOpened) {
132-
bufferedStream = new BufferedFSDataInputStream(new Path(filePath), fileSystem, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
137+
bufferedStream = new BufferedFSDataInputStream(new Path(filePath), hadoopConfig, startOffset, Constants.defaultStreamBufferInMB, maximumBytes)
133138
wasOpened = true
134139
}
135140
}
136141

137142
private def getHadoopFileSize(hadoopPath: Path): Long = {
143+
val fileSystem = hadoopPath.getFileSystem(hadoopConfig)
138144
val cSummary: ContentSummary = fileSystem.getContentSummary(hadoopPath)
139145
cSummary.getLength
140146
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/types/FileWithOrder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ package za.co.absa.cobrix.spark.cobol.source.types
1919
/**
2020
* Represents a file attached to an order.
2121
*/
22-
private[source] case class FileWithOrder(filePath: String, order: Int)
22+
private[source] case class FileWithOrder(filePath: String, order: Int, isCompressed: Boolean)

0 commit comments

Comments
 (0)