Skip to content

Commit 0dc8c2d

Browse files
committed
#809 Add support for indexes in compressed files.
1 parent 69bdafb commit 0dc8c2d

File tree

9 files changed

+103
-74
lines changed

9 files changed

+103
-74
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Among the motivations for this project, it is possible to highlight:
3939

4040
- The COBOL copybooks parser doesn't have a Spark dependency and can be reused for integrating into other data processing engines.
4141

42-
- Supports reading files compressed in Hadoop-compatible way (gzip, bzip2, etc), but with limited parallelism (only per-file parallelism).
42+
- Supports reading files compressed in Hadoop-compatible way (gzip, bzip2, etc), but with limited parallelism.
4343
Uncompressed files are preferred for performance.
4444

4545
## Videos
@@ -1605,7 +1605,7 @@ The output looks like this:
16051605
| .option("redefine-segment-id-map:0", "REDEFINED_FIELD1 => SegmentId1,SegmentId2,...") | Specifies a mapping between redefined field names and segment id values. Each option specifies a mapping for a single segment. The numeric value for each mapping option must be incremented so the option keys are unique. |
16061606
| .option("segment-children:0", "COMPANY => EMPLOYEE,DEPARTMENT") | Specifies a mapping between segment redefined fields and their children. Each option specifies a mapping for a single parent field. The numeric value for each mapping option must be incremented so the option keys are unique. If such mapping is specified hierarchical record structure will be automatically reconstructed. This require `redefine-segment-id-map` to be set. |
16071607
| .option("enable_indexes", "true") | Turns on indexing of multisegment variable length files (on by default). |
1608-
| .option("enable_index_cache", "false") | When true, calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. |
1608+
| .option("enable_index_cache", "true") | When true (default), calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. |
16091609
| .option("input_split_records", 50000) | Specifies how many records will be allocated to each split/partition. It will be processed by Spark tasks. (The default is not set and the split will happen according to size, see the next option) |
16101610
| .option("input_split_size_mb", 100) | Specify how many megabytes to allocate to each partition/split. (The default is 100 MB) |
16111611

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
144144
fileNumber: Int,
145145
isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] = {
146146
val inputSplitSizeRecords: Option[Int] = readerProperties.inputSplitRecords
147-
val inputSplitSizeMB: Option[Int] = getSplitSizeMB
147+
val inputSplitSizeMB: Option[Int] = getSplitSizeMB(dataStream.isCompressed)
148148

149149
if (inputSplitSizeRecords.isDefined) {
150150
if (inputSplitSizeRecords.get < 1 || inputSplitSizeRecords.get > 1000000000) {
@@ -153,7 +153,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
153153
logger.info(s"Input split size = ${inputSplitSizeRecords.get} records")
154154
} else {
155155
if (inputSplitSizeMB.nonEmpty) {
156-
if (inputSplitSizeMB.get < 1 || inputSplitSizeMB.get > 2000) {
156+
if (inputSplitSizeMB.get < 1 || inputSplitSizeMB.get > 200000) {
157157
throw new IllegalArgumentException(s"Invalid input split size of ${inputSplitSizeMB.get} MB.")
158158
}
159159
logger.info(s"Input split size = ${inputSplitSizeMB.get} MB")
@@ -214,11 +214,18 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
214214
}
215215
}
216216

217-
private def getSplitSizeMB: Option[Int] = {
218-
if (readerProperties.inputSplitSizeMB.isDefined) {
219-
readerProperties.inputSplitSizeMB
217+
private def getSplitSizeMB(isCompressed: Boolean): Option[Int] = {
218+
if (isCompressed) {
219+
readerProperties.inputSplitSizeCompressedMB match {
220+
case Some(size) => readerProperties.inputSplitSizeCompressedMB
221+
case None => Some(1024)
222+
}
220223
} else {
221-
readerProperties.hdfsDefaultBlockSize
224+
if (readerProperties.inputSplitSizeMB.isDefined) {
225+
readerProperties.inputSplitSizeMB
226+
} else {
227+
readerProperties.hdfsDefaultBlockSize
228+
}
222229
}
223230
}
224231

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,14 @@ object CobolParametersParser extends Logging {
118118
val PARAM_SEGMENT_REDEFINE_PREFIX_ALT = "redefine-segment-id-map"
119119

120120
// Indexed multisegment file processing
121-
val PARAM_ENABLE_INDEXES = "enable_indexes"
122-
val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache"
123-
val PARAM_INPUT_SPLIT_RECORDS = "input_split_records"
124-
val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb"
125-
val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix"
126-
val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation"
127-
val PARAM_IMPROVE_LOCALITY = "improve_locality"
121+
val PARAM_ENABLE_INDEXES = "enable_indexes"
122+
val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache"
123+
val PARAM_INPUT_SPLIT_RECORDS = "input_split_records"
124+
val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb"
125+
val PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB = "input_split_size_compressed_mb"
126+
val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix"
127+
val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation"
128+
val PARAM_IMPROVE_LOCALITY = "improve_locality"
128129

129130
// Parameters for debugging
130131
val PARAM_DEBUG_LAYOUT_POSITIONS = "debug_layout_positions"
@@ -385,6 +386,7 @@ object CobolParametersParser extends Logging {
385386
isIndexCachingAllowed = false,
386387
inputSplitRecords = None,
387388
inputSplitSizeMB = None,
389+
inputSplitSizeCompressedMB = None,
388390
improveLocality = false,
389391
optimizeAllocation = false,
390392
inputFileNameColumn = "",
@@ -421,6 +423,7 @@ object CobolParametersParser extends Logging {
421423
isIndexCachingAllowed = varLenParams.isIndexCachingAllowed,
422424
inputSplitRecords = varLenParams.inputSplitRecords,
423425
inputSplitSizeMB = varLenParams.inputSplitSizeMB,
426+
inputSplitSizeCompressedMB = varLenParams.inputSplitSizeCompressedMB,
424427
hdfsDefaultBlockSize = defaultBlockSize,
425428
startOffset = parameters.recordStartOffset,
426429
endOffset = parameters.recordEndOffset,
@@ -508,6 +511,7 @@ object CobolParametersParser extends Logging {
508511
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean,
509512
params.get(PARAM_INPUT_SPLIT_RECORDS).map(v => v.toInt),
510513
params.get(PARAM_INPUT_SPLIT_SIZE_MB).map(v => v.toInt),
514+
params.get(PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB).map(v => v.toInt),
511515
params.getOrElse(PARAM_IMPROVE_LOCALITY, "true").toBoolean,
512516
params.getOrElse(PARAM_OPTIMIZE_ALLOCATION, "false").toBoolean,
513517
params.getOrElse(PARAM_INPUT_FILE_COLUMN, ""),

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat
2020
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
2121
import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy
2222
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
23-
import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, FillerNamingPolicy, MetadataPolicy, StringTrimmingPolicy}
23+
import za.co.absa.cobrix.cobol.parser.policies._
2424
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
2525
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength
2626
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
@@ -50,6 +50,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
5050
* @param isIndexGenerationNeeded Is indexing input file before processing is requested
5151
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
5252
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
53+
* @param inputSplitSizeCompressedMB A partition size to target for compressed files.
5354
* @param hdfsDefaultBlockSize Default HDFS block size for the HDFS filesystem used. This value is used as the default split size if inputSplitSizeMB is not specified
5455
* @param startOffset An offset to the start of the record in each binary data block.
5556
* @param endOffset An offset from the end of the record to the end of the binary data block.
@@ -102,6 +103,7 @@ case class ReaderParameters(
102103
isIndexCachingAllowed: Boolean = false,
103104
inputSplitRecords: Option[Int] = None,
104105
inputSplitSizeMB: Option[Int] = None,
106+
inputSplitSizeCompressedMB: Option[Int] = None,
105107
hdfsDefaultBlockSize: Option[Int] = None,
106108
startOffset: Int = 0,
107109
endOffset: Int = 0,

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,51 +17,53 @@
1717
package za.co.absa.cobrix.cobol.reader.parameters
1818

1919
/**
20-
* This class holds the parameters currently used for parsing variable-length records.
20+
* This class is used to hold the parameters currently used for parsing variable-length records.
2121
*
22-
* @param isRecordSequence Does input files have 4 byte record length headers
23-
* @param bdw Block descriptor word (if specified), for FB and VB record formats
24-
* @param isRdwBigEndian Is RDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method
25-
* @param isRdwPartRecLength Does RDW count itself as part of record length itself
26-
* @param rdwAdjustment Controls a mismatch between RDW and record length
27-
* @param recordHeaderParser An optional custom record header parser for non-standard RDWs
28-
* @param recordExtractor An optional custom raw record parser class non-standard record types
29-
* @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser
30-
* @param reAdditionalInfo An optional additional option string passed to a custom record extractor
31-
* @param recordLengthField A field that stores record length
32-
* @param recordLengthMap A mapping between field value and record size.
33-
* @param fileStartOffset A number of bytes to skip at the beginning of each file
34-
* @param fileEndOffset A number of bytes to skip at the end of each file
35-
* @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data
36-
* @param isUsingIndex Is indexing input file before processing is requested
37-
* @param isIndexCachingAllowed Is caching of generated index allowed
38-
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
39-
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
40-
* @param improveLocality Tries to improve locality by extracting preferred locations for variable-length records
41-
* @param optimizeAllocation Optimizes cluster usage in case of optimization for locality in the presence of new nodes (nodes that do not contain any blocks of the files being processed)
42-
* @param inputFileNameColumn A column name to add to the dataframe. The column will contain input file name for each record similar to 'input_file_name()' function
22+
* @param isRecordSequence Do input files have 4 byte record length headers
23+
* @param bdw Block descriptor word (if specified), for FB and VB record formats
24+
* @param isRdwBigEndian Is RDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method
25+
* @param isRdwPartRecLength Does RDW count itself as part of record length itself
26+
* @param rdwAdjustment Controls a mismatch between RDW and record length
27+
* @param recordHeaderParser An optional custom record header parser for non-standard RDWs
28+
* @param recordExtractor An optional custom raw record parser class for non-standard record types
29+
* @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser
30+
* @param reAdditionalInfo An optional additional option string passed to a custom record extractor
31+
* @param recordLengthField A field that stores record length
32+
* @param recordLengthMap A mapping between field value and record size.
33+
* @param fileStartOffset A number of bytes to skip at the beginning of each file
34+
* @param fileEndOffset A number of bytes to skip at the end of each file
35+
* @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data
36+
* @param isUsingIndex Is indexing input file before processing is requested
37+
* @param isIndexCachingAllowed Is caching of generated index allowed
38+
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
39+
* @param inputSplitSizeCompressedMB A partition size to target for compressed files.
40+
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
41+
* @param improveLocality Tries to improve locality by extracting preferred locations for variable-length records
42+
* @param optimizeAllocation Optimizes cluster usage in case of optimization for locality in the presence of new nodes (nodes that do not contain any blocks of the files being processed)
43+
* @param inputFileNameColumn A column name to add to the dataframe. The column will contain input file name for each record similar to 'input_file_name()' function
4344
*/
4445
case class VariableLengthParameters(
45-
isRecordSequence: Boolean, // [deprecated by recordFormat]
46-
bdw: Option[Bdw],
47-
isRdwBigEndian: Boolean,
48-
isRdwPartRecLength: Boolean,
49-
rdwAdjustment: Int,
50-
recordHeaderParser: Option[String],
51-
recordExtractor: Option[String],
52-
rhpAdditionalInfo: Option[String],
53-
reAdditionalInfo: String,
54-
recordLengthField: String,
55-
recordLengthMap: Map[String, Int],
56-
fileStartOffset: Int,
57-
fileEndOffset: Int,
58-
generateRecordId: Boolean,
59-
isUsingIndex: Boolean,
60-
isIndexCachingAllowed: Boolean,
61-
inputSplitRecords: Option[Int],
62-
inputSplitSizeMB: Option[Int],
63-
improveLocality: Boolean,
64-
optimizeAllocation: Boolean,
65-
inputFileNameColumn: String,
66-
occursMappings: Map[String, Map[String, Int]]
46+
isRecordSequence: Boolean, // [deprecated by recordFormat]
47+
bdw: Option[Bdw],
48+
isRdwBigEndian: Boolean,
49+
isRdwPartRecLength: Boolean,
50+
rdwAdjustment: Int,
51+
recordHeaderParser: Option[String],
52+
recordExtractor: Option[String],
53+
rhpAdditionalInfo: Option[String],
54+
reAdditionalInfo: String,
55+
recordLengthField: String,
56+
recordLengthMap: Map[String, Int],
57+
fileStartOffset: Int,
58+
fileEndOffset: Int,
59+
generateRecordId: Boolean,
60+
isUsingIndex: Boolean,
61+
isIndexCachingAllowed: Boolean,
62+
inputSplitRecords: Option[Int],
63+
inputSplitSizeMB: Option[Int],
64+
inputSplitSizeCompressedMB: Option[Int],
65+
improveLocality: Boolean,
66+
optimizeAllocation: Boolean,
67+
inputFileNameColumn: String,
68+
occursMappings: Map[String, Map[String, Int]]
6769
)

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,7 @@ private[cobol] object IndexBuilder extends Logging {
205205

206206
val (inputStream, headerStream, maximumBytes) = getStreams(filePath, startOffset, endOffset, config)
207207
val index = try {
208-
if (inputStream.isCompressed) {
209-
val element = SparseIndexEntry(0, -1, fileOrder, 0L)
210-
ArrayBuffer[SparseIndexEntry](element)
211-
} else {
212-
reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian)
213-
}
208+
reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian)
214209
} finally {
215210
inputStream.close()
216211
headerStream.close()

0 commit comments

Comments
 (0)