diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala new file mode 100644 index 000000000..216097be8 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala @@ -0,0 +1,132 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.processor + +import za.co.absa.cobrix.cobol.processor.impl.{ArrayOfAnyHandler, StreamProcessor} +import za.co.absa.cobrix.cobol.reader.VarLenNestedReader +import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor +import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters} +import za.co.absa.cobrix.cobol.reader.schema.CobolSchema +import za.co.absa.cobrix.cobol.reader.stream.SimpleStream + +import java.io.OutputStream +import scala.collection.mutable + + +/** + * A trait that defines a processor for raw COBOL data streams. + * It provides a method to process a COBOL file or a stream, provided record processor. + */ +trait CobolProcessor { + /** + * Processes the input stream of COBOL records and writes the output to the specified output stream. + * + * @param inputStream the input stream containing raw COBOL records. + * @param outputStream the output stream where processed records will be written. + * @param rawRecordProcessor the processor that processes each raw record. + */ + def process(inputStream: SimpleStream, + outputStream: OutputStream) + (rawRecordProcessor: RawRecordProcessor): Unit + +} + +object CobolProcessor { + class CobolProcessorBuilder(copybookContents: String) { + private val caseInsensitiveOptions = new mutable.HashMap[String, String]() + + def build(): CobolProcessor = { + val readerParameters = getReaderParameters + val cobolSchema = getCobolSchema(readerParameters) + + new CobolProcessor { + override def process(inputStream: SimpleStream, + outputStream: OutputStream) + (rawRecordProcessor: RawRecordProcessor): Unit = { + val recordExtractor = getRecordExtractor(readerParameters, inputStream) + + val dataStream = inputStream.copyStream() + try { + StreamProcessor.processStream(cobolSchema.copybook, + caseInsensitiveOptions.toMap, + dataStream, + recordExtractor, + rawRecordProcessor, + outputStream) + } finally { + dataStream.close() + } + } + } + } + + /** + * Adds a single option to the builder. + * + * @param key the option key. + * @param value the option value. + * @return this builder instance for method chaining. + */ + def option(key: String, value: String): CobolProcessorBuilder = { + require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only") + caseInsensitiveOptions += (key.trim.toLowerCase -> value) + this + } + + /** + * Adds multiple options to the builder. + * + * @param options a map of option key-value pairs. + * @return this builder instance for method chaining. + */ + def options(options: Map[String, String]): CobolProcessorBuilder = { + caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) + this + } + + private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = { + CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters) + } + + private[processor] def getReaderParameters: ReaderParameters = { + val cobolParameters = CobolParametersParser.parse(new Parameters(caseInsensitiveOptions.toMap)) + + CobolParametersParser.getReaderProperties(cobolParameters, None) + } + + private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { + val dataStream = inputStream.copyStream() + val headerStream = inputStream.copyStream() + + val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) + + reader.recordExtractor(0, dataStream, headerStream) match { + case Some(extractor) => extractor + case None => + throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + + "Please check the copybook and the reader parameters." + ) + } + } + + private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap + } + + def builder(copybookContent: String): CobolProcessorBuilder = { + new CobolProcessorBuilder(copybookContent) + } +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala deleted file mode 100644 index e92ee0c98..000000000 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.cobrix.cobol.processor - -import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength -import za.co.absa.cobrix.cobol.reader.VarLenNestedReader -import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, RawRecordContext, RawRecordExtractor} -import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters} -import za.co.absa.cobrix.cobol.reader.schema.CobolSchema -import za.co.absa.cobrix.cobol.reader.stream.SimpleStream - -import java.io.OutputStream -import scala.collection.mutable -import scala.reflect.ClassTag - -object RecordProcessorBuilder { - /** - * Creates a new instance of the RecordProcessorBuilder with the given copybook content. - * The instabce is used to create an COBOL data processor allowing applying changes to a mainrame file - * without changing the original format. - * - * @param copybookContent the COBOL copybook content as a string. - * @return a new RecordProcessorBuilder instance. - */ - def copybookContents(copybookContent: String): RecordProcessorBuilder = { - new RecordProcessorBuilder(copybookContent) - } -} - -class RecordProcessorBuilder(copybookContents: String) { - private val caseInsensitiveOptions = new mutable.HashMap[String, String]() - - /** - * Adds a single option to the builder. - * - * @param key the option key. - * @param value the option value. - * @return this builder instance for method chaining. - */ - def option(key: String, value: String): RecordProcessorBuilder = { - caseInsensitiveOptions += (key.toLowerCase -> value) - this - } - - /** - * Adds multiple options to the builder. - * - * @param options a map of option key-value pairs. - * @return this builder instance for method chaining. - */ - def options(options: Map[String, String]): RecordProcessorBuilder = { - caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) - this - } - - /** - * Processes the input stream of COBOL records and writes the output to the specified output stream. - * - * @param inputStream the input stream containing raw COBOL records. - * @param outputStream the output stream where processed records will be written. - * @param rawRecordProcessor the processor that processes each raw record. - */ - def process(inputStream: SimpleStream, - outputStream: OutputStream) - (rawRecordProcessor: RawRecordProcessor): Unit = { - val readerParameters = getReaderParameters - val cobolSchema = getCobolSchema(readerParameters) - val recordExtractor = getRecordExtractor(readerParameters, inputStream) - - val dataStream = inputStream.copyStream() - try { - StreamProcessor.processStream(cobolSchema.copybook, - caseInsensitiveOptions.toMap, - dataStream, - recordExtractor, - rawRecordProcessor, - outputStream) - } finally { - dataStream.close() - } - } - - private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = { - CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters) - } - - private[processor] def getReaderParameters: ReaderParameters = { - val cobolParameters = CobolParametersParser.parse(new Parameters(caseInsensitiveOptions.toMap)) - - CobolParametersParser.getReaderProperties(cobolParameters, None) - } - - private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { - val dataStream = inputStream.copyStream() - val headerStream = inputStream.copyStream() - - val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) - - reader.recordExtractor(0, dataStream, headerStream) match { - case Some(extractor) => extractor - case None if readerParameters.recordFormat == FixedLength => - val dataStream = inputStream.copyStream() - val headerStream = inputStream.copyStream() - val ctx = RawRecordContext(0, dataStream, headerStream, getCobolSchema(readerParameters).copybook, null, null, "") - new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength) - case None => - throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + - "Please check the copybook and the reader parameters." - ) - } - } - - private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap -} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/ArrayOfAnyHandler.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala similarity index 96% rename from cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/ArrayOfAnyHandler.scala rename to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala index ffa305879..78e5e35ba 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/ArrayOfAnyHandler.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.cobrix.cobol.processor +package za.co.absa.cobrix.cobol.processor.impl import za.co.absa.cobrix.cobol.parser.ast.Group import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/MapOfAnyHandler.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala similarity index 96% rename from cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/MapOfAnyHandler.scala rename to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala index 52ec1178b..5fd691c0a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/MapOfAnyHandler.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.cobrix.cobol.processor +package za.co.absa.cobrix.cobol.processor.impl import za.co.absa.cobrix.cobol.parser.ast.Group import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/StreamProcessor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala similarity index 77% rename from cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/StreamProcessor.scala rename to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala index 2dcdf3b57..e9d843473 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/StreamProcessor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala @@ -14,9 +14,10 @@ * limitations under the License. */ -package za.co.absa.cobrix.cobol.processor +package za.co.absa.cobrix.cobol.processor.impl import za.co.absa.cobrix.cobol.parser.Copybook +import za.co.absa.cobrix.cobol.processor.RawRecordProcessor import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor import za.co.absa.cobrix.cobol.reader.stream.SimpleStream @@ -26,12 +27,13 @@ object StreamProcessor { /** * Processes a stream of COBOL raw records and writes it back in the same format as the input data. * - * @param copybook the COBOL copybook that describes the schema of the records. - * @param options arbitrary options used for splitting input data into records. Same as options to 'spark-cobol'. Can contain custom options as well. - * @param inputStream the input stream containing the raw COBOL records. + * @param copybook the COBOL copybook that describes the schema of the records. + * @param options arbitrary options used for splitting input data into records (same as 'spark-cobol' options). + * Keys are lower-cased for case-insensitive handling. Can contain custom options as well. + * @param inputStream the input stream containing the raw COBOL records. * @param recordExtractor the extractor that extracts raw records from the input stream. * @param recordProcessor the per-record processing logic implementation. - * @param outputStream the output stream where the processed records will be written. + * @param outputStream the output stream where the processed records will be written. */ def processStream(copybook: Copybook, options: Map[String, String], @@ -39,9 +41,7 @@ object StreamProcessor { recordExtractor: RawRecordExtractor, recordProcessor: RawRecordProcessor, outputStream: OutputStream): Unit = { - var i = 0 while (recordExtractor.hasNext) { - i += 1 val record = recordExtractor.next() val recordSize = record.length diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 5754976c8..5c327cf18 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -17,7 +17,6 @@ package za.co.absa.cobrix.cobol.reader import za.co.absa.cobrix.cobol.internal.Logging -import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory} import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, FixedLength, VariableBlock, VariableLength} @@ -27,7 +26,6 @@ import za.co.absa.cobrix.cobol.reader.index.IndexGenerator import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry import za.co.absa.cobrix.cobol.reader.iterator.{VarLenHierarchicalIterator, VarLenNestedIterator} import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters -import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParameters} import za.co.absa.cobrix.cobol.reader.schema.CobolSchema import za.co.absa.cobrix.cobol.reader.stream.SimpleStream import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator @@ -56,15 +54,10 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], def recordExtractor(startingRecordNumber: Long, dataStream: SimpleStream, headerStream: SimpleStream): Option[RawRecordExtractor] = { - val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment) - - val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams) - val bdwOpt = readerProperties.bdw - val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment)) - val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams)) - - val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, cobolSchema.copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo) + val reParams = RawRecordContext.builder(startingRecordNumber, dataStream, headerStream, cobolSchema.copybook) + .withReaderParams(readerProperties) + .build() readerProperties.recordExtractor match { case Some(recordExtractorClass) => @@ -88,6 +81,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], !readerProperties.isRecordSequence && readerProperties.lengthFieldExpression.isEmpty => Some(new VarOccursRecordExtractor(reParams)) + case None if readerProperties.recordFormat == FixedLength => + Some(new FixedRecordLengthRawRecordExtractor(reParams, readerProperties.recordLength)) case None => None } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala index 1d3bbf334..fdec38ac6 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala @@ -17,18 +17,20 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import za.co.absa.cobrix.cobol.parser.Copybook -import za.co.absa.cobrix.cobol.reader.recordheader.RecordHeaderDecoder +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoder, RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParameters} import za.co.absa.cobrix.cobol.reader.stream.SimpleStream /** * @param startingRecordNumber A record number the input stream is pointing to (zero-based). - * @param inputStream An input stream pointing to the beginning of a file or a record in a file. The + * @param inputStream An input stream pointing to the beginning of a file or a record in a file. The * record extractor should close the stream when the end of file is reached. * @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing * to a record in the middle. The record extractor should close the stream when it * is no longer needed. * @param copybook A copybook of the input stream. * @param additionalInfo A string provided by a client for the raw record extractor. + * @param options All options passed to `spark-cobol` */ case class RawRecordContext( startingRecordNumber: Long, @@ -37,5 +39,101 @@ case class RawRecordContext( copybook: Copybook, rdwDecoder: RecordHeaderDecoder, bdwDecoder: RecordHeaderDecoder, - additionalInfo: String + additionalInfo: String, + options: Map[String, String] ) + +object RawRecordContext { + class RawRecordContextBuilder(inputStream: SimpleStream, + headerStream: SimpleStream, + copybook: Copybook) { + private var context = RawRecordContext(0L, + inputStream, + headerStream, + copybook, + new RecordHeaderDecoderRdw(RecordHeaderParameters(isBigEndian = true, 0)), + new RecordHeaderDecoderBdw(RecordHeaderParameters(isBigEndian = true, 0)), + "", + Map.empty[String, String] + ) + + def withReaderParams(readerParameters: ReaderParameters): RawRecordContextBuilder = { + val rdwParams = RecordHeaderParameters(readerParameters.isRdwBigEndian, readerParameters.rdwAdjustment) + + val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams) + + val bdwOpt = readerParameters.bdw + val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment)) + val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams)) + + withAdditionalInfo(readerParameters.reAdditionalInfo) + .withRdwDecoder(rdwDecoder) + .withBdwDecoder(bdwDecoderOpt.getOrElse(rdwDecoder)) + .withOptions(readerParameters.options) + } + + def withStartingRecordNumber(startingRecordNumber: Long): RawRecordContextBuilder = { + require(startingRecordNumber >= 0, s"startingRecordNumber must be >= 0, got: $startingRecordNumber") + context = context.copy(startingRecordNumber = startingRecordNumber) + this + } + + def withRdwDecoder(rdwDecoder: RecordHeaderDecoder): RawRecordContextBuilder = { + context = context.copy(rdwDecoder = rdwDecoder) + this + } + + def withBdwDecoder(bdwDecoder: RecordHeaderDecoder): RawRecordContextBuilder = { + context = context.copy(bdwDecoder = bdwDecoder) + this + } + + def withAdditionalInfo(additionalInfo: String): RawRecordContextBuilder = { + context = context.copy(additionalInfo = additionalInfo) + this + } + + def withOptions(options: Map[String, String]): RawRecordContextBuilder = { + context = context.copy(options = options) + this + } + + def build(): RawRecordContext = { + context + } + } + + /** + * Creates a new instance of `RawRecordContextBuilder` with the specified input stream, + * header stream, and copybook. The builder allows further customization of the + * `RawRecordContext` before building it. + * + * The header stream should always point to the beginning of a file. + * The input stream can point in the middle of the file when read not from the beginning. + * + * @param inputStream the main data stream containing the record data. + * @param headerStream the stream containing header information for the records. + * @param copybook the copybook defining the structure of the raw records. + * @return a new instance of `RawRecordContextBuilder`. + */ + def builder(startingRecordNumber: Long, + inputStream: SimpleStream, + headerStream: SimpleStream, + copybook: Copybook): RawRecordContextBuilder = { + val builder = new RawRecordContextBuilder(inputStream, headerStream, copybook) + + builder.withStartingRecordNumber(startingRecordNumber) + } + + /** + * Creates a new instance of `RawRecordContextBuilder` with the specified input stream, + * presuming the file is going to be read from beginning. + * + * @param inputStream the main data stream containing the record data. + * @param copybook the copybook defining the structure of the raw records. + * @return a new instance of `RawRecordContextBuilder`. + */ + def builder(inputStream: SimpleStream, + copybook: Copybook): RawRecordContextBuilder = new RawRecordContextBuilder(inputStream, inputStream.copyStream(), copybook) + +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index 285a93087..f8331e716 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -63,6 +63,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param debugIgnoreFileSize If true the fixed length file reader won't check file size divisibility. Useful for debugging binary file / copybook mismatches. * @param debugLayoutPositions If true, layout positions for input files will be logged (false by default) * @param metadataPolicy Specifies the policy of metadat fields to be added to the Spark schema + * @param options Options passed to 'spark-cobol'. */ case class CobolParameters( copybookPath: Option[String], @@ -105,5 +106,6 @@ case class CobolParameters( debugIgnoreFileSize: Boolean, debugLayoutPositions: Boolean, enableSelfChecks: Boolean, - metadataPolicy: MetadataPolicy + metadataPolicy: MetadataPolicy, + options: Map[String, String] ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala index 89effdd4b..c8a54e750 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala @@ -303,7 +303,8 @@ object CobolParametersParser extends Logging { params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean, params.getOrElse(PARAM_DEBUG_LAYOUT_POSITIONS, "false").toBoolean, params.getOrElse(PARAM_ENABLE_SELF_CHECKS, "false").toBoolean, - MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic")) + MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic")), + params.getMap ) validateSparkCobolOptions(params, recordFormat) cobolParameters @@ -446,7 +447,8 @@ object CobolParametersParser extends Logging { varLenParams.rhpAdditionalInfo, varLenParams.reAdditionalInfo, varLenParams.inputFileNameColumn, - parameters.metadataPolicy + parameters.metadataPolicy, + parameters.options ) } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index c1daa3a44..73c3c8b0a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -75,6 +75,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser * @param inputFileNameColumn A column name to add to the dataframe. The column will contain input file name for each record similar to 'input_file_name()' function * @param metadataPolicy Specifies the policy of metadat fields to be added to the Spark schema + * @param options Options passed to spark-cobol */ case class ReaderParameters( recordFormat: RecordFormat = FixedLength, @@ -129,5 +130,6 @@ case class ReaderParameters( rhpAdditionalInfo: Option[String] = None, reAdditionalInfo: String = "", inputFileNameColumn: String = "", - metadataPolicy: MetadataPolicy = MetadataPolicy.Basic + metadataPolicy: MetadataPolicy = MetadataPolicy.Basic, + options: Map[String, String] = Map.empty ) diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilderSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala similarity index 86% rename from cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilderSuite.scala rename to cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala index ef8dc3a15..b25e237bf 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilderSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala @@ -25,7 +25,7 @@ import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import java.io.ByteArrayOutputStream -class RecordProcessorBuilderSuite extends AnyWordSpec { +class CobolProcessorBuilderSuite extends AnyWordSpec { private val copybook = """ 01 RECORD. | 05 T PIC X. @@ -34,7 +34,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec { "process an input data stream into an output stream" in { val is = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) val os = new ByteArrayOutputStream(10) - val builder = RecordProcessorBuilder.copybookContents(copybook) + val builder = CobolProcessor.builder(copybook) val processor = new RawRecordProcessor { override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = { @@ -42,7 +42,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec { } } - builder.process(is, os)(processor) + builder.build().process(is, os)(processor) val outputArray = os.toByteArray @@ -55,7 +55,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec { "getCobolSchema" should { "return the schema of the copybook provided" in { - val builder = RecordProcessorBuilder.copybookContents(copybook) + val builder = CobolProcessor.builder(copybook) val cobolSchema = builder.getCobolSchema(ReaderParameters()) @@ -65,10 +65,11 @@ class RecordProcessorBuilderSuite extends AnyWordSpec { "getReaderParameters" should { "return a reader according to passed options" in { - val builder = RecordProcessorBuilder.copybookContents(copybook) + val builder = CobolProcessor.builder(copybook) .option("record_format", "D") assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText) + assert(builder.getReaderParameters.options.contains("record_format")) assert(builder.getOptions.contains("record_format")) } } @@ -76,9 +77,9 @@ class RecordProcessorBuilderSuite extends AnyWordSpec { "getRecordExtractor" should { "work for an fixed-record-length files" in { val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) - val builder = RecordProcessorBuilder.copybookContents(copybook) + val builder = CobolProcessor.builder(copybook) - val ext = builder.getRecordExtractor(ReaderParameters(recordLength = Some(2)), stream) + val ext = builder.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), stream) assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor]) @@ -90,7 +91,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec { "work for an variable-record-length files" in { val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) - val builder = RecordProcessorBuilder.copybookContents(copybook) + val builder = CobolProcessor.builder(copybook) val ext = builder.getRecordExtractor(ReaderParameters( recordFormat = RecordFormat.VariableLength, @@ -102,7 +103,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec { "throw an exception on a non-supported record format for processing" in { val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) - val builder = RecordProcessorBuilder.copybookContents(copybook) + val builder = CobolProcessor.builder(copybook) val ex = intercept[IllegalArgumentException] { builder.getRecordExtractor(ReaderParameters( diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala index 780e18700..f69b1e0ce 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/RecordExtractorDebugSpec.scala @@ -32,7 +32,7 @@ class RecordExtractorDebugSpec extends AnyWordSpec { val dataStream = new TestStringStream(data) val headerStream = new TestStringStream(data) - val ctx = RawRecordContext(0, dataStream, headerStream, null, null, null, "") + val ctx = RawRecordContext.builder(0L, dataStream, headerStream, null).build() val extractor = new CustomRecordExtractorMock(ctx) var i = 0 diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala index 15f98f939..c6febe3a3 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala @@ -63,7 +63,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0) - val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) + val recordExtractor = new TextRecordExtractor(RawRecordContext.builder(0L, dataStream, headerStream, copybook).build()) val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None, @@ -85,7 +85,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0) - val recordExtractor = new TextFullRecordExtractor(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) + val recordExtractor = new TextFullRecordExtractor(RawRecordContext.builder(0L, dataStream, headerStream, copybook).build()) val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None, @@ -105,7 +105,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { val dataStream = new TestStringStream(textFileContent) val headerStream = new TestStringStream(textFileContent) - val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) + val recordExtractor = new RecordExtractorMock(RawRecordContext.builder(0L, dataStream, headerStream, copybook).build()) val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None, @@ -148,7 +148,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { val dataStream = new TestByteStream(data) val headerStream = new TestByteStream(data) - val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) + val recordExtractor = new RecordExtractorMock(RawRecordContext.builder(0L, dataStream, headerStream, copybook).build()) recordExtractor.onReceiveAdditionalInfo("dummy") val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, @@ -178,7 +178,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { // Skip the first 2 bytes to the file offset dataStream.next(2) - val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) + val recordExtractor = new RecordExtractorMock(RawRecordContext.builder(0L, dataStream, headerStream, copybook).build()) val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 2L, @@ -207,7 +207,7 @@ class SparseIndexSpecSpec extends AnyWordSpec { val dataStream = new TestByteStream(data) val headerStream = new TestByteStream(data) - val recordExtractor = new RecordExtractorReadAhaedMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) + val recordExtractor = new RecordExtractorReadAhaedMock(RawRecordContext.builder(0L, dataStream, headerStream, copybook).build()) val ex = intercept[IllegalStateException] { diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala index 1bdcf0168..5f5cef0df 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala @@ -169,10 +169,7 @@ class FixedBlockRawRecordExtractorSuite extends AnyWordSpec { val ibs = new TestByteStream(bytes) val hbs = new TestByteStream(bytes) - val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) - val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) - - RawRecordContext(0, ibs, hbs, copybook, rdwDecoder, bdwDecoder, "") + RawRecordContext.builder(0L, ibs, hbs, copybook).build() } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala index 3290c0120..40ed0fd39 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala @@ -32,12 +32,9 @@ object RawRecordContextFactory { startingRecordNumber: Long = 0L, inputStream: SimpleStream = new TestStringStream("A1\nB2\n"), headerStream: SimpleStream = new TestStringStream("A1\nB2\n"), - copybook: Copybook = copybook, - rdwDecoder: RecordHeaderDecoder = new RecordHeaderDecoderBdw(RecordHeaderParameters(isBigEndian = false, 0)), - bdwDecoder: RecordHeaderDecoder = new RecordHeaderDecoderRdw(RecordHeaderParameters(isBigEndian = false, 0)), - additionalInfo: String = "" + copybook: Copybook = copybook ): RawRecordContext = { - RawRecordContext(startingRecordNumber, inputStream, headerStream, copybook, rdwDecoder, bdwDecoder, additionalInfo) + RawRecordContext.builder(startingRecordNumber, inputStream, headerStream, copybook).build() } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala index 6fc564190..dc1a7199b 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractorSuite.scala @@ -19,7 +19,6 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import org.scalatest.wordspec.AnyWordSpec import za.co.absa.cobrix.cobol.parser.CopybookParser import za.co.absa.cobrix.cobol.reader.memorystream.TestByteStream -import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParametersFactory} class VarOccursRecordExtractorSuite extends AnyWordSpec { "variable occurs record extractor" should { @@ -42,7 +41,7 @@ class VarOccursRecordExtractorSuite extends AnyWordSpec { val ibs = new TestByteStream(recordData) val hbs = new TestByteStream(recordData) - val rc = RawRecordContext(0, ibs, hbs, copybook, null, null, "") + val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build() val extractor = new VarOccursRecordExtractor(rc) @@ -85,7 +84,7 @@ class VarOccursRecordExtractorSuite extends AnyWordSpec { val ibs = new TestByteStream(recordData) val hbs = new TestByteStream(recordData) - val rc = RawRecordContext(0, ibs, hbs, copybook, null, null, "") + val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build() val extractor = new VarOccursRecordExtractor(rc) @@ -141,7 +140,7 @@ class VarOccursRecordExtractorSuite extends AnyWordSpec { val ibs = new TestByteStream(recordData) val hbs = new TestByteStream(recordData) - val rc = RawRecordContext(0, ibs, hbs, copybook, null, null, "") + val rc = RawRecordContext.builder(0L, ibs, hbs, copybook).build() val extractor = new VarOccursRecordExtractor(rc) diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala index 3a0476784..d4cf96eae 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala @@ -151,7 +151,10 @@ class VariableBlockVariableRecordExtractorSuite extends AnyWordSpec { val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(bdwBigEndian, bdwAdjustment)) val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(rdwBigEndian, rdwAdjustment)) - RawRecordContext(0, ibs, hbs, copybook, rdwDecoder, bdwDecoder, "") + RawRecordContext.builder(0L, ibs, hbs, copybook) + .withRdwDecoder(rdwDecoder) + .withBdwDecoder(bdwDecoder) + .build() } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala index 2ca8c9e15..7b1dda346 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala @@ -89,7 +89,7 @@ class VRLRecordReaderSpec extends AnyWordSpec { "work for custom record extractor" in { val stream = new ByteStreamMock(customHeaderRecords) - val context = RawRecordContext(0, stream, stream, null, null, null, "") + val context = RawRecordContext.builder(stream, null).build() val reader = getUseCase( records = customHeaderRecords, @@ -127,9 +127,8 @@ class VRLRecordReaderSpec extends AnyWordSpec { 0x00, 0x07, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8 ).map(_.toByte) - val streamH = new ByteStreamMock(records) - val streamD = new ByteStreamMock(records) - val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "") + val stream = new ByteStreamMock(records) + val context = RawRecordContext.builder(stream, CopybookParser.parseSimple(copybookWithFieldLength)).build() val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN")) @@ -170,9 +169,8 @@ class VRLRecordReaderSpec extends AnyWordSpec { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF1, 0xF5, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8 ).map(_.toByte) - val streamH = new ByteStreamMock(records) - val streamD = new ByteStreamMock(records) - val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "") + val stream = new ByteStreamMock(records) + val context = RawRecordContext.builder(stream, CopybookParser.parseSimple(copybookWithFieldLength)).build() val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN")) @@ -214,9 +212,8 @@ class VRLRecordReaderSpec extends AnyWordSpec { 0xC3, 0xF5, 0xF6, 0xC4, 0xC5, 0xC6 ).map(_.toByte) - val streamH = new ByteStreamMock(records) - val streamD = new ByteStreamMock(records) - val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithLenbgthMap), null, null, "") + val stream = new ByteStreamMock(records) + val context = RawRecordContext.builder(stream, CopybookParser.parseSimple(copybookWithLenbgthMap)).build() val readerParameters = ReaderParameters( lengthFieldExpression = Some("LEN_SPEC"), @@ -266,9 +263,8 @@ class VRLRecordReaderSpec extends AnyWordSpec { 0x00, 0x08, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8 ).map(_.toByte) - val streamH = new ByteStreamMock(records) - val streamD = new ByteStreamMock(records) - val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "") + val stream = new ByteStreamMock(records) + val context = RawRecordContext.builder(stream, CopybookParser.parseSimple(copybookWithFieldLength)).build() val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN - 1")) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala index 27084d8e9..2bd9867d0 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala @@ -163,7 +163,12 @@ private[source] object IndexBuilder extends Logging { logger.info(s"Going to generate index for the file: $filePath") val (inputStream, headerStream, maximumBytes) = getStreams(filePath, startOffset, endOffset, config) - val index = reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian) + val index = try { + reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian) + } finally { + inputStream.close() + headerStream.close() + } val indexWithEndOffset = if (maximumBytes > 0 ){ index.map(entry => if (entry.offsetTo == -1) entry.copy(offsetTo = startOffset + maximumBytes) else entry) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala index 977d900e7..7947b285f 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala @@ -26,6 +26,7 @@ import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecor class CustomRecordExtractorMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { CustomRecordExtractorMock.additionalInfo = ctx.additionalInfo CustomRecordExtractorMock.catchContext = ctx + CustomRecordExtractorMock.options = ctx.options private var recordNumber = ctx.startingRecordNumber @@ -54,4 +55,5 @@ class CustomRecordExtractorMock(ctx: RawRecordContext) extends Serializable with object CustomRecordExtractorMock { var additionalInfo: String = "" var catchContext: RawRecordContext = _ + var options = Map.empty[String, String] } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala index ec7906722..1ac28f546 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala @@ -44,7 +44,7 @@ class Test21VariableOccurs extends AnyFunSuite with SparkTestBase { val headerStream = new FSStream(s"$inputDataPath/data.dat") val copybookContents = Files.readAllLines(Paths.get("../data/test21_copybook.cob"), StandardCharsets.ISO_8859_1).toArray.mkString("\n") val copybook = CopybookParser.parse(copybookContents, ASCII) - val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, headerStream, copybook, null, null, "")) + val recordExtractor = new VarOccursRecordExtractor(RawRecordContext.builder(0L, inputStream, headerStream, copybook).build()) val expectedRecords = ListBuffer(Array(48.toByte), Array(49.toByte, 48.toByte), diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala index 1c9a195bd..64ed65a13 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala @@ -67,7 +67,7 @@ class Test25OccursMappings extends AnyFunSuite with SparkTestBase { ) val copybook = CopybookParser.parse(copybookContents, ASCII, occursHandlers = occursMapping) - val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, headerStream, copybook, null, null, "")) + val recordExtractor = new VarOccursRecordExtractor(RawRecordContext.builder(0L, inputStream, headerStream, copybook).build()) val expectedRecords = ListBuffer( "1AX".getBytes, diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala index e9195d2d2..753f5260e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala @@ -72,6 +72,8 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi assert(actual == expected) assert(CustomRecordExtractorMock.additionalInfo == "re info") assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.inputStream) + assert(CustomRecordExtractorMock.options.contains("schema_retention_policy")) + assert(CustomRecordExtractorMock.options("maximum_record_length") == "2") } }