Skip to content

Commit a0136c0

Browse files
committed
#809 Make index cache 'true' by default, remove some code duplication.
1 parent d5b8f51 commit a0136c0

File tree

9 files changed

+29
-33
lines changed

9 files changed

+29
-33
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ import scala.reflect.ClassTag
4242
class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
4343
readerProperties: ReaderParameters,
4444
handler: RecordHandler[T]) extends VarLenReader with Logging with Serializable {
45+
private val DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB = 1024
46+
private val DEFAULT_FS_INDEX_SIZE_MULTIPLIER = 4
4547

4648
protected val cobolSchema: CobolSchema = loadCopyBook(copybookContents)
4749

@@ -217,13 +219,9 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
217219

218220
private def getSplitSizeMB(isCompressed: Boolean): Option[Int] = {
219221
if (isCompressed) {
220-
readerProperties.inputSplitSizeCompressedMB.orElse(Some(1024))
222+
readerProperties.inputSplitSizeCompressedMB.orElse(Some(DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB))
221223
} else {
222-
if (readerProperties.inputSplitSizeMB.isDefined) {
223-
readerProperties.inputSplitSizeMB
224-
} else {
225-
readerProperties.hdfsDefaultBlockSize
226-
}
224+
readerProperties.inputSplitSizeMB.orElse(readerProperties.fsDefaultBlockSize).map(_ * DEFAULT_FS_INDEX_SIZE_MULTIPLIER)
227225
}
228226
}
229227

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ object CobolParametersParser extends Logging {
424424
inputSplitRecords = varLenParams.inputSplitRecords,
425425
inputSplitSizeMB = varLenParams.inputSplitSizeMB,
426426
inputSplitSizeCompressedMB = varLenParams.inputSplitSizeCompressedMB,
427-
hdfsDefaultBlockSize = defaultBlockSize,
427+
fsDefaultBlockSize = defaultBlockSize,
428428
startOffset = parameters.recordStartOffset,
429429
endOffset = parameters.recordEndOffset,
430430
fileStartOffset = varLenParams.fileStartOffset,
@@ -508,7 +508,7 @@ object CobolParametersParser extends Logging {
508508
fileEndOffset,
509509
isRecordIdGenerationEnabled,
510510
params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean,
511-
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean,
511+
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "true").toBoolean,
512512
params.get(PARAM_INPUT_SPLIT_RECORDS).map(v => v.toInt),
513513
params.get(PARAM_INPUT_SPLIT_SIZE_MB).map(v => v.toInt),
514514
params.get(PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB).map(v => v.toInt),

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
5151
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
5252
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
5353
* @param inputSplitSizeCompressedMB A partition size to target for compressed files.
54-
* @param hdfsDefaultBlockSize Default HDFS block size for the HDFS filesystem used. This value is used as the default split size if inputSplitSizeMB is not specified
54+
* @param fsDefaultBlockSize Default HDFS block size for the HDFS filesystem used. This value is used as the default split size if inputSplitSizeMB is not specified
5555
* @param startOffset An offset to the start of the record in each binary data block.
5656
* @param endOffset An offset from the end of the record to the end of the binary data block.
5757
* @param fileStartOffset A number of bytes to skip at the beginning of each file
@@ -104,7 +104,7 @@ case class ReaderParameters(
104104
inputSplitRecords: Option[Int] = None,
105105
inputSplitSizeMB: Option[Int] = None,
106106
inputSplitSizeCompressedMB: Option[Int] = None,
107-
hdfsDefaultBlockSize: Option[Int] = None,
107+
fsDefaultBlockSize: Option[Int] = None,
108108
startOffset: Int = 0,
109109
endOffset: Int = 0,
110110
fileStartOffset: Int = 0,

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ object DefaultSource {
174174
*/
175175
def createTextReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
176176
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
177-
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
177+
val defaultHdfsBlockSize = SparkUtils.getDefaultFsBlockSize(spark, parameters.sourcePaths.headOption)
178178
new FixedLenTextReader(copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
179179
)
180180
}
@@ -185,7 +185,7 @@ object DefaultSource {
185185
def createFixedLengthReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
186186

187187
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
188-
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
188+
val defaultHdfsBlockSize = SparkUtils.getDefaultFsBlockSize(spark, parameters.sourcePaths.headOption)
189189
new FixedLenNestedReader(copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
190190
)
191191
}
@@ -199,7 +199,7 @@ object DefaultSource {
199199

200200

201201
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
202-
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
202+
val defaultHdfsBlockSize = SparkUtils.getDefaultFsBlockSize(spark, parameters.sourcePaths.headOption)
203203
new VarLenNestedReader(
204204
copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
205205
)

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/BufferedFSDataInputStream.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package za.co.absa.cobrix.spark.cobol.source.streaming
1818

1919
import org.apache.hadoop.conf.Configuration
2020
import org.apache.hadoop.fs.{FSDataInputStream, Path}
21-
import org.apache.hadoop.io.compress.CompressionCodecFactory
21+
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
2222

2323
import java.io.{IOException, InputStream}
2424

@@ -121,11 +121,9 @@ class BufferedFSDataInputStream(filePath: Path, hadoopConfig: Configuration, sta
121121

122122
private def openStream(): InputStream = {
123123
val fileSystem = filePath.getFileSystem(hadoopConfig)
124+
val codec = FileUtils.getCompressionCodec(filePath, hadoopConfig)
124125
val fsIn: FSDataInputStream = fileSystem.open(filePath)
125126

126-
val factory = new CompressionCodecFactory(hadoopConfig)
127-
val codec = factory.getCodec(filePath)
128-
129127
val baseStream = if (codec != null) {
130128
isCompressedStream = true
131129
codec.createInputStream(fsIn)

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ package za.co.absa.cobrix.spark.cobol.source.streaming
1818

1919
import org.apache.hadoop.conf.Configuration
2020
import org.apache.hadoop.fs.{ContentSummary, Path}
21-
import org.apache.hadoop.io.compress.CompressionCodecFactory
2221
import org.apache.log4j.Logger
2322
import za.co.absa.cobrix.cobol.reader.common.Constants
2423
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
24+
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
2525

2626
import java.io.IOException
2727

@@ -51,12 +51,7 @@ class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: L
5151
private var wasOpened = false
5252
private var bufferedStream: BufferedFSDataInputStream = _
5353

54-
private lazy val isCompressedStream = {
55-
val factory = new CompressionCodecFactory(hadoopConfig)
56-
val codec = factory.getCodec(hadoopPath)
57-
58-
codec != null
59-
}
54+
private lazy val isCompressedStream = FileUtils.isCompressed(hadoopPath, hadoopConfig)
6055

6156
private lazy val fileSize = getHadoopFileSize(hadoopPath)
6257

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/FileUtils.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package za.co.absa.cobrix.spark.cobol.utils
1818

1919
import org.apache.hadoop.conf.Configuration
2020
import org.apache.hadoop.fs._
21-
import org.apache.hadoop.io.compress.CompressionCodecFactory
21+
import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory}
2222
import za.co.absa.cobrix.cobol.internal.Logging
2323

2424
import java.io.{FileOutputStream, IOException, OutputStreamWriter, PrintWriter}
@@ -217,16 +217,17 @@ object FileUtils extends Logging {
217217
}
218218

219219
def isCompressed(file: Path, hadoopConfig: Configuration): Boolean = {
220-
val factory = new CompressionCodecFactory(hadoopConfig)
221-
val codec = factory.getCodec(file)
220+
getCompressionCodec(file, hadoopConfig) != null
221+
}
222222

223-
codec != null
223+
def getCompressionCodec(file: Path, hadoopConfig: Configuration): CompressionCodec = {
224+
val factory = new CompressionCodecFactory(hadoopConfig)
225+
factory.getCodec(file)
224226
}
225227

226228
def getCompressedFileSize(file: Path, hadoopConfig: Configuration): Long = {
227229
logger.warn(s"Using full scan to determine file size of $file..")
228-
val factory = new CompressionCodecFactory(hadoopConfig)
229-
val codec = factory.getCodec(file)
230+
val codec = getCompressionCodec(file, hadoopConfig)
230231
val fileSystem = file.getFileSystem(hadoopConfig)
231232
val fsIn: FSDataInputStream = fileSystem.open(file)
232233
val ifs = codec.createInputStream(fsIn)

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ package za.co.absa.cobrix.spark.cobol.utils
1919
import com.fasterxml.jackson.databind.ObjectMapper
2020
import org.apache.hadoop.fs.{FileSystem, Path}
2121
import org.apache.spark.SparkContext
22-
import org.apache.spark.sql.functions.{array, col, expr, max, struct}
23-
import za.co.absa.cobrix.spark.cobol.utils.impl.HofsWrapper.transform
22+
import org.apache.spark.sql.functions._
2423
import org.apache.spark.sql.types._
2524
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
2625
import za.co.absa.cobrix.cobol.internal.Logging
2726
import za.co.absa.cobrix.spark.cobol.parameters.MetadataFields.MAX_ELEMENTS
27+
import za.co.absa.cobrix.spark.cobol.utils.impl.HofsWrapper.transform
2828

2929
import scala.annotation.tailrec
3030
import scala.collection.mutable
@@ -489,7 +489,7 @@ object SparkUtils extends Logging {
489489
}.getOrElse(None)
490490
}
491491

492-
def getDefaultHdfsBlockSize(spark: SparkSession, pathOpt: Option[String]): Option[Int] = {
492+
def getDefaultFsBlockSize(spark: SparkSession, pathOpt: Option[String]): Option[Int] = {
493493
val conf = spark.sparkContext.hadoopConfiguration
494494

495495
val fileSystem =pathOpt match {

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test40CompressesFilesSpec.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class Test40CompressesFilesSpec extends AnyFunSuite with SparkTestBase with Bina
5656
val options = if (useIndexes) {
5757
Map(
5858
"input_split_records" -> "1",
59+
"enable_index_cache" -> "false",
5960
"generate_record_id" -> "true"
6061
)
6162
} else {
@@ -178,6 +179,8 @@ class Test40CompressesFilesSpec extends AnyFunSuite with SparkTestBase with Bina
178179
.option("schema_retention_policy", "collapse_root")
179180
.option("floating_point_format", "IEEE754")
180181
.option("strict_sign_overpunching", "true")
182+
.option("generate_record_id", "true")
183+
.option("enable_index_cache", "false")
181184
.option("pedantic", "true")
182185
.load(inputDataPath)
183186

@@ -195,6 +198,7 @@ class Test40CompressesFilesSpec extends AnyFunSuite with SparkTestBase with Bina
195198
.option("floating_point_format", "IEEE754")
196199
.option("strict_sign_overpunching", "true")
197200
.option("file_end_offset", 1493)
201+
.option("enable_index_cache", "false")
198202
.option("pedantic", "true")
199203
.load(inputDataPath)
200204

0 commit comments

Comments
 (0)