Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1587,10 +1587,13 @@ The output looks like this:

##### ASCII files options (for record_format = D or D2)

| Option (usage example) | Description |
|-------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| .option("record_format", "D") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `record_format = D`, multisegment and hierarchical text record files can be loaded. |
| Option (usage example) | Description |
|----------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| .option("record_format", "D") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `record_format = D`, multisegment and hierarchical text record files can be loaded. |
| .option("ascii_charset", "US-ASCII") | Specifies a charset to use to decode ASCII data. The value can be any charset supported by `java.nio.charset`: `US-ASCII` (default), `UTF-8`, `ISO-8859-1`, etc. |
| .option("field_code_page:cp825", "field1, field2") | Specifies the code page for selected fields. You can add mo than 1 such option for multiple code page overrides. |
| .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. It is used to skip ASCII lines that contains invalid records, an EOF character, for example. |

##### Multisegment files options

Expand Down Expand Up @@ -1621,10 +1624,7 @@ The output looks like this:
| .option("pedantic", "false") | If 'true' Cobrix will throw an exception is an unknown option is encountered. If 'false' (default), unknown options will be logged as an error without failing Spark Application. |
| .option("debug_layout_positions", "true") | If 'true' Cobrix will generate and log layout positions table when reading data. |
| .option("debug_ignore_file_size", "true") | If 'true' no exception will be thrown if record size does not match file size. Useful for debugging copybooks to make them match a data file. |
| .option("ascii_charset", "US-ASCII") | Specifies a charset to use to decode ASCII data. The value can be any charset supported by `java.nio.charset`: `US-ASCII` (default), `UTF-8`, `ISO-8859-1`, etc. |
| .option("field_code_page:cp825", "field1, field2") | Specifies the code page for selected fields. You can add mo than 1 such option for multiple code page overrides. |
| .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. It is used to skip ASCII lines that contains invalid records, an EOF character, for example. |
| .option("maximum_record_length", 1000) | Specifies the maximum length a record is considered valid, will be skipped otherwise. |
| .option("enable_self_checks", "true") | If 'true' (default) Cobrix will run self-checks that might slightly slow performance. The only check implemented so far is custom record extractor indexing compatibility check. |

##### Currently supported EBCDIC code pages

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],

def recordExtractor(startingRecordNumber: Long,
dataStream: SimpleStream,
headerStream: SimpleStream,
copybook: Copybook
): Option[RawRecordExtractor] = {
headerStream: SimpleStream): Option[RawRecordExtractor] = {
val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment)

val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams)
Expand All @@ -66,7 +64,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment))
val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams))

val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo)
val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, cobolSchema.copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo)

readerProperties.recordExtractor match {
case Some(recordExtractorClass) =>
Expand Down Expand Up @@ -113,7 +111,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
dataStream,
readerProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook),
recordExtractor(startingRecordIndex, dataStream, headerStream),
fileNumber,
startingRecordIndex,
startingFileOffset,
Expand All @@ -123,7 +121,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
dataStream,
readerProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook),
recordExtractor(startingRecordIndex, dataStream, headerStream),
fileNumber,
startingRecordIndex,
startingFileOffset,
Expand Down Expand Up @@ -178,7 +176,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
dataStream,
readerProperties.fileStartOffset,
recordHeaderParser,
recordExtractor(0L, dataStream, headerStream, copybook),
recordExtractor(0L, dataStream, headerStream),
inputSplitSizeRecords,
inputSplitSizeMB,
Some(copybook),
Expand All @@ -189,7 +187,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
dataStream,
readerProperties.fileStartOffset,
recordHeaderParser,
recordExtractor(0L, dataStream, headerStream, copybook),
recordExtractor(0L, dataStream, headerStream),
inputSplitSizeRecords,
inputSplitSizeMB,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ case class CobolParameters(
debugFieldsPolicy: DebugFieldsPolicy,
debugIgnoreFileSize: Boolean,
debugLayoutPositions: Boolean,
enableSelfChecks: Boolean,
metadataPolicy: MetadataPolicy
)
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ case class ReaderParameters(
decodeBinaryAsHex: Boolean = false,
dropGroupFillers: Boolean = false,
dropValueFillers: Boolean = true,
enableSelfChecks: Boolean = true,
fillerNamingPolicy: FillerNamingPolicy = FillerNamingPolicy.SequenceNumbers,
nonTerminals: Seq[String] = Nil,
occursMappings: Map[String, Map[String, Int]] = Map(),
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.9
sbt.version=1.10.11
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ object CobolParametersParser extends Logging {
// Parameters for debugging
val PARAM_DEBUG_LAYOUT_POSITIONS = "debug_layout_positions"
val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size"
val PARAM_ENABLE_SELF_CHECKS = "enable_self_checks"

private def getSchemaRetentionPolicy(params: Parameters): SchemaRetentionPolicy = {
val schemaRetentionPolicyName = params.getOrElse(PARAM_SCHEMA_RETENTION_POLICY, "collapse_root")
Expand Down Expand Up @@ -284,6 +285,7 @@ object CobolParametersParser extends Logging {
getDebuggingFieldsPolicy(recordFormat, params),
params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean,
params.getOrElse(PARAM_DEBUG_LAYOUT_POSITIONS, "false").toBoolean,
params.getOrElse(PARAM_ENABLE_SELF_CHECKS, "true").toBoolean,
MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic"))
)
validateSparkCobolOptions(params, recordFormat)
Expand Down Expand Up @@ -416,6 +418,7 @@ object CobolParametersParser extends Logging {
parameters.decodeBinaryAsHex,
parameters.dropGroupFillers,
parameters.dropValueFillers,
parameters.enableSelfChecks,
parameters.fillerNamingPolicy,
parameters.nonTerminals,
parameters.occursMappings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ final class VarLenNestedReader(copybookContents: Seq[String],
dataStream,
getReaderProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook),
recordExtractor(startingRecordIndex, dataStream, headerStream),
fileNumber,
startingRecordIndex,
startingFileOffset,
Expand All @@ -72,7 +72,7 @@ final class VarLenNestedReader(copybookContents: Seq[String],
dataStream,
getReaderProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook),
recordExtractor(startingRecordIndex, dataStream, headerStream),
fileNumber,
startingRecordIndex,
startingFileOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.sql.SQLContext
import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.reader.common.Constants
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
import za.co.absa.cobrix.cobol.reader.{VarLenNestedReader => ReaderVarLenNestedReader}
import za.co.absa.cobrix.spark.cobol.reader.{Reader, VarLenReader}
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
Expand Down Expand Up @@ -111,10 +113,15 @@ private[source] object IndexBuilder extends Logging {
private[cobol] def buildIndexForVarLenReader(filesList: Array[FileWithOrder],
reader: VarLenReader,
sqlContext: SQLContext): RDD[SparseIndexEntry] = {
val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
val conf = sqlContext.sparkContext.hadoopConfiguration
val sconf = new SerializableConfiguration(conf)

if (reader.getReaderProperties.enableSelfChecks && filesList.nonEmpty) {
selfCheckForIndexCompatibility(reader, filesList.head.filePath, conf)
}

val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)

val indexRDD = filesRDD.mapPartitions(
partition => {
partition.flatMap(row => {
Expand Down Expand Up @@ -149,36 +156,113 @@ private[source] object IndexBuilder extends Logging {
config: Configuration,
reader: VarLenReader): ArrayBuffer[SparseIndexEntry] = {
val filePath = fileWithOrder.filePath
val path = new Path(filePath)
val fileOrder = fileWithOrder.order
val startOffset = reader.getReaderProperties.fileStartOffset
val endOffset = reader.getReaderProperties.fileEndOffset

logger.info(s"Going to generate index for the file: $filePath")

val (inputStream, headerStream, maximumBytes) = getStreams(filePath, startOffset, endOffset, config)
val index = reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian)

val indexWithEndOffset = if (maximumBytes > 0 ){
index.map(entry => if (entry.offsetTo == -1) entry.copy(offsetTo = startOffset + maximumBytes) else entry)
} else {
index
}

indexWithEndOffset
}

private[cobol] def getStreams(filePath: String,
fileStartOffset: Long,
fileEndOffset: Long,
config: Configuration): (SimpleStream, SimpleStream, Long) = {
val path = new Path(filePath)
val fileSystem = path.getFileSystem(config)

val startOffset = reader.getReaderProperties.fileStartOffset
val maximumBytes = if (reader.getReaderProperties.fileEndOffset == 0) {
val startOffset = fileStartOffset
val maximumBytes = if (fileEndOffset == 0) {
0
} else {
val bytesToRead = fileSystem.getContentSummary(path).getLength - reader.getReaderProperties.fileEndOffset - startOffset
val bytesToRead = fileSystem.getContentSummary(path).getLength - fileEndOffset - startOffset
if (bytesToRead < 0)
0
else
bytesToRead
}

logger.info(s"Going to generate index for the file: $filePath")
val inputStream = new FileStreamer(filePath, fileSystem, startOffset, maximumBytes)
val headerStream = new FileStreamer(filePath, fileSystem)
val index = reader.generateIndex(inputStream, headerStream,
fileOrder, reader.isRdwBigEndian)

val indexWithEndOffset = if (maximumBytes > 0 ){
index.map(entry => if (entry.offsetTo == -1) entry.copy(offsetTo = startOffset + maximumBytes) else entry)
} else {
index
}

indexWithEndOffset
(inputStream, headerStream, maximumBytes)
}

private[cobol] def selfCheckForIndexCompatibility(reader: VarLenReader, filePath: String, config: Configuration): Unit = {
if (!reader.isInstanceOf[ReaderVarLenNestedReader[_]])
return

val readerProperties = reader.getReaderProperties

val startOffset = readerProperties.fileStartOffset
val endOffset = readerProperties.fileEndOffset

readerProperties.recordExtractor.foreach { recordExtractorClass =>
val (dataStream, headerStream, _) = getStreams(filePath, startOffset, endOffset, config)

val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream)

var offset = -1L
var record = Array[Byte]()

extractorOpt.foreach { extractor =>
if (extractor.hasNext) {
// Getting the first record, if available
extractor.next()
offset = extractor.offset // Saving offset to jump to later

if (extractor.hasNext) {
// Getting the second record, if available
record = extractor.next() // Saving the record to check later

dataStream.close()
headerStream.close()

// Getting new streams and record extractor that points directly to the second record
val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2)

extractorOpt2.foreach { extractor2 =>
if (!extractor2.hasNext) {
// If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support.
throw new RuntimeException(
s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " +
"Please, use 'enable_indexes = false'. " +
s"File: $filePath, offset: $offset"
)
}

// Getting the second record from the extractor pointing to the second record offset at the start.
val expectedRecord = extractor2.next()

if (!expectedRecord.sameElements(record)) {
// Records should match. If they don't, the record extractor is faulty in terms of indexing support..
throw new RuntimeException(
s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " +
"Please, use 'enable_indexes = false'. " +
s"File: $filePath, offset: $offset"
)
} else {
logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset")
}
dataStream2.close()
headerStream2.close()
}
}
}
}
}
}

private[cobol] def getBlockLengthByIndexEntry(entry: SparseIndexEntry): Long = {
val indexedLength = if (entry.offsetTo - entry.offsetFrom > 0)
Expand Down
Loading
Loading