diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a2d7c0396..30fb20ee2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -34,7 +34,6 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v4 - - uses: coursier/cache-action@v5 - name: Setup JDK uses: actions/setup-java@v4 with: diff --git a/README.md b/README.md index 3c6a6b374..56d1c527d 100644 --- a/README.md +++ b/README.md @@ -1587,10 +1587,13 @@ The output looks like this: ##### ASCII files options (for record_format = D or D2) -| Option (usage example) | Description | -|-------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| .option("record_format", "D") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). | -| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `record_format = D`, multisegment and hierarchical text record files can be loaded. | +| Option (usage example) | Description | +|----------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| .option("record_format", "D") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). | +| .option("is_text", "true") | If 'true' the file will be considered a text file where records are separated by an end-of-line character. Currently, only ASCII files having UTF-8 charset can be processed this way. If combined with `record_format = D`, multisegment and hierarchical text record files can be loaded. | +| .option("ascii_charset", "US-ASCII") | Specifies a charset to use to decode ASCII data. The value can be any charset supported by `java.nio.charset`: `US-ASCII` (default), `UTF-8`, `ISO-8859-1`, etc. | +| .option("field_code_page:cp825", "field1, field2") | Specifies the code page for selected fields. You can add more than 1 such option for multiple code page overrides. | +| .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. It is used to skip ASCII lines that contains invalid records, an EOF character, for example. | ##### Multisegment files options @@ -1621,10 +1624,7 @@ The output looks like this: | .option("pedantic", "false") | If 'true' Cobrix will throw an exception is an unknown option is encountered. If 'false' (default), unknown options will be logged as an error without failing Spark Application. | | .option("debug_layout_positions", "true") | If 'true' Cobrix will generate and log layout positions table when reading data. | | .option("debug_ignore_file_size", "true") | If 'true' no exception will be thrown if record size does not match file size. Useful for debugging copybooks to make them match a data file. | -| .option("ascii_charset", "US-ASCII") | Specifies a charset to use to decode ASCII data. The value can be any charset supported by `java.nio.charset`: `US-ASCII` (default), `UTF-8`, `ISO-8859-1`, etc. | -| .option("field_code_page:cp825", "field1, field2") | Specifies the code page for selected fields. You can add mo than 1 such option for multiple code page overrides. | -| .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. It is used to skip ASCII lines that contains invalid records, an EOF character, for example. | -| .option("maximum_record_length", 1000) | Specifies the maximum length a record is considered valid, will be skipped otherwise. | +| .option("enable_self_checks", "true") | If 'true' (default) Cobrix will run self-checks to validate internal consistency. Note: Enabling this option may impact performance, especially for large datasets. It is recommended to disable this option in performance-critical environments. The only check implemented so far is custom record extractor indexing compatibility check. | ##### Currently supported EBCDIC code pages diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 066d51463..5754976c8 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -55,9 +55,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], def recordExtractor(startingRecordNumber: Long, dataStream: SimpleStream, - headerStream: SimpleStream, - copybook: Copybook - ): Option[RawRecordExtractor] = { + headerStream: SimpleStream): Option[RawRecordExtractor] = { val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment) val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams) @@ -66,7 +64,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment)) val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams)) - val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo) + val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, cobolSchema.copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo) readerProperties.recordExtractor match { case Some(recordExtractorClass) => @@ -113,7 +111,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], dataStream, readerProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream), fileNumber, startingRecordIndex, startingFileOffset, @@ -123,7 +121,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], dataStream, readerProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream), fileNumber, startingRecordIndex, startingFileOffset, @@ -178,7 +176,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], dataStream, readerProperties.fileStartOffset, recordHeaderParser, - recordExtractor(0L, dataStream, headerStream, copybook), + recordExtractor(0L, dataStream, headerStream), inputSplitSizeRecords, inputSplitSizeMB, Some(copybook), @@ -189,7 +187,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], dataStream, readerProperties.fileStartOffset, recordHeaderParser, - recordExtractor(0L, dataStream, headerStream, copybook), + recordExtractor(0L, dataStream, headerStream), inputSplitSizeRecords, inputSplitSizeMB, None, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index 8a241dd5e..8b1a21ee2 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -102,5 +102,6 @@ case class CobolParameters( debugFieldsPolicy: DebugFieldsPolicy, debugIgnoreFileSize: Boolean, debugLayoutPositions: Boolean, + enableSelfChecks: Boolean, metadataPolicy: MetadataPolicy ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index 36c546ffa..bcd9c11b3 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -117,6 +117,7 @@ case class ReaderParameters( decodeBinaryAsHex: Boolean = false, dropGroupFillers: Boolean = false, dropValueFillers: Boolean = true, + enableSelfChecks: Boolean = true, fillerNamingPolicy: FillerNamingPolicy = FillerNamingPolicy.SequenceNumbers, nonTerminals: Seq[String] = Nil, occursMappings: Map[String, Map[String, Int]] = Map(), diff --git a/project/build.properties b/project/build.properties index 04267b14a..cc68b53f1 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.9 +sbt.version=1.10.11 diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index 70e7b86f6..6b4e7c9d3 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -128,6 +128,7 @@ object CobolParametersParser extends Logging { // Parameters for debugging val PARAM_DEBUG_LAYOUT_POSITIONS = "debug_layout_positions" val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size" + val PARAM_ENABLE_SELF_CHECKS = "enable_self_checks" private def getSchemaRetentionPolicy(params: Parameters): SchemaRetentionPolicy = { val schemaRetentionPolicyName = params.getOrElse(PARAM_SCHEMA_RETENTION_POLICY, "collapse_root") @@ -284,6 +285,7 @@ object CobolParametersParser extends Logging { getDebuggingFieldsPolicy(recordFormat, params), params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean, params.getOrElse(PARAM_DEBUG_LAYOUT_POSITIONS, "false").toBoolean, + params.getOrElse(PARAM_ENABLE_SELF_CHECKS, "true").toBoolean, MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic")) ) validateSparkCobolOptions(params, recordFormat) @@ -416,6 +418,7 @@ object CobolParametersParser extends Logging { parameters.decodeBinaryAsHex, parameters.dropGroupFillers, parameters.dropValueFillers, + parameters.enableSelfChecks, parameters.fillerNamingPolicy, parameters.nonTerminals, parameters.occursMappings, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala index 463d140ec..d9aa80367 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala @@ -60,7 +60,7 @@ final class VarLenNestedReader(copybookContents: Seq[String], dataStream, getReaderProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream), fileNumber, startingRecordIndex, startingFileOffset, @@ -72,7 +72,7 @@ final class VarLenNestedReader(copybookContents: Seq[String], dataStream, getReaderProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream), fileNumber, startingRecordIndex, startingFileOffset, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala index 413a0f359..27084d8e9 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.SQLContext import za.co.absa.cobrix.cobol.internal.Logging import za.co.absa.cobrix.cobol.reader.common.Constants import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry +import za.co.absa.cobrix.cobol.reader.stream.SimpleStream +import za.co.absa.cobrix.cobol.reader.{VarLenNestedReader => ReaderVarLenNestedReader} import za.co.absa.cobrix.spark.cobol.reader.{Reader, VarLenReader} import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters @@ -111,10 +113,15 @@ private[source] object IndexBuilder extends Logging { private[cobol] def buildIndexForVarLenReader(filesList: Array[FileWithOrder], reader: VarLenReader, sqlContext: SQLContext): RDD[SparseIndexEntry] = { - val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length) val conf = sqlContext.sparkContext.hadoopConfiguration val sconf = new SerializableConfiguration(conf) + if (reader.getReaderProperties.enableSelfChecks && filesList.nonEmpty) { + selfCheckForIndexCompatibility(reader, filesList.head.filePath, conf) + } + + val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length) + val indexRDD = filesRDD.mapPartitions( partition => { partition.flatMap(row => { @@ -149,36 +156,113 @@ private[source] object IndexBuilder extends Logging { config: Configuration, reader: VarLenReader): ArrayBuffer[SparseIndexEntry] = { val filePath = fileWithOrder.filePath - val path = new Path(filePath) val fileOrder = fileWithOrder.order + val startOffset = reader.getReaderProperties.fileStartOffset + val endOffset = reader.getReaderProperties.fileEndOffset + + logger.info(s"Going to generate index for the file: $filePath") + + val (inputStream, headerStream, maximumBytes) = getStreams(filePath, startOffset, endOffset, config) + val index = reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian) + + val indexWithEndOffset = if (maximumBytes > 0 ){ + index.map(entry => if (entry.offsetTo == -1) entry.copy(offsetTo = startOffset + maximumBytes) else entry) + } else { + index + } + + indexWithEndOffset + } + + private[cobol] def getStreams(filePath: String, + fileStartOffset: Long, + fileEndOffset: Long, + config: Configuration): (SimpleStream, SimpleStream, Long) = { + val path = new Path(filePath) val fileSystem = path.getFileSystem(config) - val startOffset = reader.getReaderProperties.fileStartOffset - val maximumBytes = if (reader.getReaderProperties.fileEndOffset == 0) { + val startOffset = fileStartOffset + val maximumBytes = if (fileEndOffset == 0) { 0 } else { - val bytesToRead = fileSystem.getContentSummary(path).getLength - reader.getReaderProperties.fileEndOffset - startOffset + val bytesToRead = fileSystem.getContentSummary(path).getLength - fileEndOffset - startOffset if (bytesToRead < 0) 0 else bytesToRead } - logger.info(s"Going to generate index for the file: $filePath") val inputStream = new FileStreamer(filePath, fileSystem, startOffset, maximumBytes) val headerStream = new FileStreamer(filePath, fileSystem) - val index = reader.generateIndex(inputStream, headerStream, - fileOrder, reader.isRdwBigEndian) - val indexWithEndOffset = if (maximumBytes > 0 ){ - index.map(entry => if (entry.offsetTo == -1) entry.copy(offsetTo = startOffset + maximumBytes) else entry) - } else { - index - } - - indexWithEndOffset + (inputStream, headerStream, maximumBytes) } + private[cobol] def selfCheckForIndexCompatibility(reader: VarLenReader, filePath: String, config: Configuration): Unit = { + if (!reader.isInstanceOf[ReaderVarLenNestedReader[_]]) + return + + val readerProperties = reader.getReaderProperties + + val startOffset = readerProperties.fileStartOffset + val endOffset = readerProperties.fileEndOffset + + readerProperties.recordExtractor.foreach { recordExtractorClass => + val (dataStream, headerStream, _) = getStreams(filePath, startOffset, endOffset, config) + + val extractorOpt = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(0, dataStream, headerStream) + + var offset = -1L + var record = Array[Byte]() + + extractorOpt.foreach { extractor => + if (extractor.hasNext) { + // Getting the first record, if available + extractor.next() + offset = extractor.offset // Saving offset to jump to later + + if (extractor.hasNext) { + // Getting the second record, if available + record = extractor.next() // Saving the record to check later + + dataStream.close() + headerStream.close() + + // Getting new streams and record extractor that points directly to the second record + val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config) + val extractorOpt2 = reader.asInstanceOf[ReaderVarLenNestedReader[_]].recordExtractor(1, dataStream2, headerStream2) + + extractorOpt2.foreach { extractor2 => + if (!extractor2.hasNext) { + // If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support. + throw new RuntimeException( + s"Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " + + "Please, use 'enable_indexes = false'. " + + s"File: $filePath, offset: $offset" + ) + } + + // Getting the second record from the extractor pointing to the second record offset at the start. + val expectedRecord = extractor2.next() + + if (!expectedRecord.sameElements(record)) { + // Records should match. If they don't, the record extractor is faulty in terms of indexing support.. + throw new RuntimeException( + s"Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " + + "Please, use 'enable_indexes = false'. " + + s"File: $filePath, offset: $offset" + ) + } else { + logger.info(s"Record extractor self-check passed. File: $filePath, offset: $offset") + } + dataStream2.close() + headerStream2.close() + } + } + } + } + } + } private[cobol] def getBlockLengthByIndexEntry(entry: SparseIndexEntry): Long = { val indexedLength = if (entry.offsetTo - entry.offsetFrom > 0) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractor.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractor.scala new file mode 100644 index 000000000..83e9e5e8e --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractor.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.mocks + +import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor} + +/** + * This record extractor assumes each record has the size of 2 bytes. + */ +class FixedRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + + private var recordNumber = ctx.startingRecordNumber + + override def offset: Long = ctx.inputStream.offset + + override def hasNext: Boolean = !ctx.inputStream.isEndOfStream + + @throws[NoSuchElementException] + override def next(): Array[Byte] = { + if (!hasNext) { + throw new NoSuchElementException + } + + val rawRecord = ctx.inputStream.next(2) + + recordNumber += 1 + + rawRecord + } + +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorBroken.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorBroken.scala new file mode 100644 index 000000000..4895242bf --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorBroken.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.mocks + +import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor} + +/** + * This record extractor that returns hasNext=false when started with non-zero offset + */ +class FixedRecordExtractorBroken(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + + private var recordNumber = ctx.startingRecordNumber + + private val startingOffset = ctx.inputStream.offset + + override def offset: Long = ctx.inputStream.offset + + override def hasNext: Boolean = startingOffset == 0 && !ctx.inputStream.isEndOfStream + + @throws[NoSuchElementException] + override def next(): Array[Byte] = { + if (!hasNext) { + throw new NoSuchElementException + } + + val rawRecord = ctx.inputStream.next(2) + + recordNumber += 1 + + rawRecord + } +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorNoIndex.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorNoIndex.scala new file mode 100644 index 000000000..0d259fe38 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/FixedRecordExtractorNoIndex.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.mocks + +import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor} + +/** + * This record extractor assumes each record has the size of 2 bytes. + * + * This record extractor is not index compatible. + */ +class FixedRecordExtractorNoIndex (ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + + private var currentOffset = ctx.inputStream.offset + private var recordNumber = ctx.startingRecordNumber + + private var currentRecord = fetchRecord() + + // This record extractor does not support indexes because it returns offsets not pointing to the next record. + // Since the record is fetched eagerly, it returns the offset of the next record. + override def offset: Long = currentOffset + + override def hasNext: Boolean = currentRecord.nonEmpty + + @throws[NoSuchElementException] + override def next(): Array[Byte] = { + if (!hasNext) { + throw new NoSuchElementException + } + + val rawRecord = currentRecord.get + + // In order to support indexes the next 2 lines should be reversed. + currentRecord = fetchRecord() + currentOffset = ctx.inputStream.offset + + recordNumber += 1 + + rawRecord + } + + def fetchRecord(): Option[Array[Byte]] = { + if (ctx.inputStream.isEndOfStream) { + None + } else { + Option(ctx.inputStream.next(2)) + } + } +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39RecordExtractorSelfCheck.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39RecordExtractorSelfCheck.scala new file mode 100644 index 000000000..73f7a8bdd --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39RecordExtractorSelfCheck.scala @@ -0,0 +1,136 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.apache.spark.sql.DataFrame +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.spark.cobol.mocks.CustomRecordExtractorMock +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +class Test39RecordExtractorSelfCheck extends AnyWordSpec with SparkTestBase with BinaryFileFixture { + private val copybook = + """ 01 R. + 03 A PIC X(2). + """ + private val data = "AABBCCDDEEFF" + + "Record extractor supporting indexes" should { + "should work with indexes" in { + val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"},{"A":"DD"},{"A":"EE"},{"A":"FF"}]""" + + withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName => + val df = getDataFrame(tmpFileName, Map( + "enable_self_checks" -> "true", + "record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractor", + "input_split_records" -> "2") + ) + + val actual = df.orderBy("A").toJSON.collect().mkString("[", ",", "]") + + assert(actual == expected) + } + } + } + + "Record extractor not supporting indexes" should { + "should fail self checks when offsets are not properly handled" in { + withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName => + val df = getDataFrame(tmpFileName, Map( + "enable_self_checks" -> "true", + "record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorNoIndex", + "input_split_records" -> "2") + ) + + val ex = intercept[RuntimeException] { + df.show(false) + df.count() + } + + assert(ex.getMessage.contains("Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset")) + assert(ex.getMessage.contains("offset: 4")) + } + } + + "should fail self checks when the extractor returns hasNext=false unexpectedly" in { + withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName => + val df = getDataFrame(tmpFileName, Map( + "enable_self_checks" -> "true", + "record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorBroken", + "input_split_records" -> "2") + ) + + val ex = intercept[RuntimeException] { + df.show(false) + df.count() + } + + assert(ex.getMessage.contains("Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false")) + assert(ex.getMessage.contains("offset: 2")) + } + } + + "should still work if self checks is turned off" in { + withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName => + val df = getDataFrame(tmpFileName, Map( + "enable_self_checks" -> "false", + "record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorNoIndex", + "input_split_records" -> "2") + ) + + // No guarantees regarding the correct record count at this point + assert(df.count() > 4) + } + } + + "should still work if there is just one record" in { + withTempBinFile("custom_re", ".dat", "AA".getBytes) { tmpFileName => + val df = getDataFrame(tmpFileName, Map( + "enable_self_checks" -> "true", + "record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorNoIndex") + ) + + assert(df.count() == 1) + } + } + + "should still work if indexes are disabled" in { + val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"},{"A":"DD"},{"A":"EE"},{"A":"FF"}]""" + + withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName => + val df = getDataFrame(tmpFileName, Map( + "record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorNoIndex", + "enable_indexes" -> "false") + ) + + val actual = df.orderBy("A").toJSON.collect().mkString("[", ",", "]") + + assert(actual == expected) + } + } + } + + private def getDataFrame(inputPath: String, extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame = { + spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ascii") + .options(extraOptions) + .load(inputPath) + } +}