diff --git a/README.md b/README.md index 2b6e20508..07b406fc0 100644 --- a/README.md +++ b/README.md @@ -1668,15 +1668,17 @@ The EBCDIC processor allows processing files by replacing value of fields withou The processing does not require Spark. A processing application can have only the COBOL parser as a dependency (`cobol-parser`). -Here is an example usage: +Here is an example usage (using streams of bytes): ```scala val is = new FSStream(inputFile) val os = new FileOutputStream(outputFile) -val copybookContents = "...some copybook..." val builder = CobolProcessor.builder(copybookContents) +val builder = CobolProcessor.builder + .withCopybookContents("...some copybook...") + val processor = new RawRecordProcessor { - override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = { + override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = { // The transformation logic goes here val value = copybook.getFieldValueByName("some_field", record, 0) // Change the field v @@ -1688,10 +1690,53 @@ val processor = new RawRecordProcessor { } } -builder.build().process(is, os)(processor) +val count = builder.build().process(is, os)(processor) +``` + +Here is an example usage (using paths): +```scala +val count = CobolProcessor.builder + .withCopybookContents(copybook) + .withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) => + // The transformation logic goes here + val value = copybook.getFieldValueByName("some_field", record, 0) + // Change the field v + // val newValue = ... + // Write the changed value back + copybook.setFieldValueByName("some_field", record, newValue, 0) + // Return the changed record + record + } + .load(inputFile) + .save(outputFile) ``` +## EBCDIC Spark Processor (experimental) +This allows in-place processing of data retaining original format in parallel uring RDDs under the hood. + +Here is an example usage: +```scala +import za.co.absa.cobrix.spark.cobol.SparkCobolProcessor + +val copybookContents = "...some copybook..." + +SparkCobolProcessor.builder + .withCopybookContents(copybook) + .withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) => + // The transformation logic goes here + val value = ctx.copybook.getFieldValueByName("some_field", record, 0) + // Change the field v + // val newValue = ... + // Write the changed value back + ctx.copybook.setFieldValueByName("some_field", record, newValue, 0) + // Return the changed record + record + } + .load(inputPath) + .save(outputPath) +``` + ## EBCDIC Writer (experimental) Cobrix's EBCDIC writer is an experimental feature that allows writing Spark DataFrames as EBCDIC mainframe files. diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala index 6d7a71e02..ea77277d0 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala @@ -16,14 +16,13 @@ package za.co.absa.cobrix.cobol.processor -import za.co.absa.cobrix.cobol.processor.impl.{ArrayOfAnyHandler, StreamProcessor} -import za.co.absa.cobrix.cobol.reader.VarLenNestedReader -import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor +import za.co.absa.cobrix.cobol.parser.Copybook +import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorImpl import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters} import za.co.absa.cobrix.cobol.reader.schema.CobolSchema -import za.co.absa.cobrix.cobol.reader.stream.SimpleStream +import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream} -import java.io.OutputStream +import java.io.{BufferedInputStream, BufferedOutputStream, FileOutputStream, OutputStream} import scala.collection.mutable @@ -47,32 +46,58 @@ trait CobolProcessor { } object CobolProcessor { - class CobolProcessorBuilder(copybookContents: String) { + class CobolProcessorBuilder { private val caseInsensitiveOptions = new mutable.HashMap[String, String]() + private var copybookContentsOpt: Option[String] = None + private var rawRecordProcessorOpt: Option[RawRecordProcessor] = None def build(): CobolProcessor = { + if (copybookContentsOpt.isEmpty) { + throw new IllegalArgumentException("Copybook contents must be provided.") + } + val readerParameters = getReaderParameters val cobolSchema = getCobolSchema(readerParameters) - new CobolProcessor { - override def process(inputStream: SimpleStream, - outputStream: OutputStream) - (rawRecordProcessor: RawRecordProcessor): Long = { - val recordExtractor = getRecordExtractor(readerParameters, inputStream) - - val dataStream = inputStream.copyStream() - try { - StreamProcessor.processStream(cobolSchema.copybook, - caseInsensitiveOptions.toMap, - dataStream, - recordExtractor, - rawRecordProcessor, - outputStream) - } finally { - dataStream.close() - } - } + new CobolProcessorImpl(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap) + } + + def load(path: String): CobolProcessorLoader = { + val file = new java.io.File(path) + if (!file.exists) { + throw new IllegalArgumentException(s"Path $path does not exist.") + } + + if (file.isDirectory) { + throw new IllegalArgumentException(s"Path $path should be a file, not a directory.") + } + + if (copybookContentsOpt.isEmpty) { + throw new IllegalArgumentException("Copybook contents must be provided.") + } + + if (rawRecordProcessorOpt.isEmpty) { + throw new IllegalArgumentException("A RawRecordProcessor must be provided.") + } + + if (rawRecordProcessorOpt.isEmpty) { + throw new IllegalArgumentException("A RawRecordProcessor must be provided.") } + + val readerParameters = getReaderParameters + val cobolSchema = getCobolSchema(readerParameters) + + new CobolProcessorLoader(path, copybookContentsOpt.get, cobolSchema.copybook, rawRecordProcessorOpt.get, readerParameters, caseInsensitiveOptions.toMap) + } + + def withCopybookContents(copybookContents: String): CobolProcessorBuilder = { + copybookContentsOpt = Option(copybookContents) + this + } + + def withRecordProcessor(processor: RawRecordProcessor): CobolProcessorBuilder = { + rawRecordProcessorOpt = Option(processor) + this } /** @@ -100,7 +125,7 @@ object CobolProcessor { } private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = { - CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters) + CobolSchema.fromReaderParameters(Seq(copybookContentsOpt.get), readerParameters) } private[processor] def getReaderParameters: ReaderParameters = { @@ -109,25 +134,60 @@ object CobolProcessor { CobolParametersParser.getReaderProperties(cobolParameters, None) } - private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { - val dataStream = inputStream.copyStream() - val headerStream = inputStream.copyStream() + private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap + } - val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) + class CobolProcessorLoader(fileToProcess: String, + copybookContents: String, + copybook: Copybook, + rawRecordProcessor: RawRecordProcessor, + readerParameters: ReaderParameters, + options: Map[String, String]) { + def save(outputFile: String): Long = { + val processor = new CobolProcessorImpl(readerParameters, copybook, copybookContents, options) + + val ifs = new FSStream(fileToProcess) + val ofs = new BufferedOutputStream(new FileOutputStream(outputFile)) + + var originalException: Throwable = null + + val recordCount = try { + processor.process(ifs, ofs)(rawRecordProcessor) + } catch { + case ex: Throwable => + originalException = ex + 0L + } finally { + try { + ifs.close() + } catch { + case e: Throwable => + if (originalException != null) { + originalException.addSuppressed(e) + } else { + originalException = e + } + } - reader.recordExtractor(0, dataStream, headerStream) match { - case Some(extractor) => extractor - case None => - throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + - "Please check the copybook and the reader parameters." - ) + try { + ofs.close() + } catch { + case e: Throwable => + if (originalException != null) { + originalException.addSuppressed(e) + } else { + originalException = e + } + } } - } - private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap + if (originalException != null) throw originalException + + recordCount + } } - def builder(copybookContent: String): CobolProcessorBuilder = { - new CobolProcessorBuilder(copybookContent) + def builder: CobolProcessorBuilder = { + new CobolProcessorBuilder } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala new file mode 100644 index 000000000..2fe917cb0 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorContext.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.processor + +import za.co.absa.cobrix.cobol.parser.Copybook + +case class CobolProcessorContext(copybook: Copybook, + options: Map[String, String], + currentOffset: Long) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala index e56f5b68a..41b17b901 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala @@ -16,16 +16,10 @@ package za.co.absa.cobrix.cobol.processor -import za.co.absa.cobrix.cobol.parser.Copybook - /** * A trait that defines a processor for raw COBOL records. * It provides a method to process a single COBOL record based on the provided copybook and options. */ trait RawRecordProcessor { - def processRecord(copybook: Copybook, - options: Map[String, String], - record: Array[Byte], - offset: Long): Array[Byte] - + def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala new file mode 100644 index 000000000..a0ef2ba1a --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/SerializableRawRecordProcessor.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.processor + +/** + * A serializable version of RawRecordProcessor for distributed processing in Spark. + * + * Usage patterns: + * - For standalone JVM applications: Use CobolProcessor with RawRecordProcessor + * - For Spark applications: Use SparkCobolProcessor with SerializableRawRecordProcessor + * + * This trait extends Serializable since Spark distributes processing code across the network + * to worker nodes, requiring all components to be serializable. + */ +trait SerializableRawRecordProcessor extends RawRecordProcessor with Serializable diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala new file mode 100644 index 000000000..9531f4d29 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.processor.impl + +import za.co.absa.cobrix.cobol.parser.Copybook +import za.co.absa.cobrix.cobol.processor.{CobolProcessor, RawRecordProcessor} +import za.co.absa.cobrix.cobol.reader.VarLenNestedReader +import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.stream.SimpleStream + +import java.io.OutputStream + +/** + * Implementation of the CobolProcessor trait, responsible for processing COBOL data streams + * by extracting records and applying a user-defined raw record processor. + * + * The processing can be done from inside an RDD so this is why it is serializable. + * + * Please, do not use this class directly. Use `CobolProcessor.builder()` instead. + * + * @param readerParameters Configuration for record extraction and COBOL file parsing. + * @param copybook The copybook definition used for interpreting COBOL data structures. + * @param copybookContents The raw textual representation of the copybook. + * @param options A map of processing options to customize the behavior of the processor (same as for `spark-cobol`). + */ +class CobolProcessorImpl(readerParameters: ReaderParameters, + copybook: Copybook, + copybookContents: String, + options: Map[String, String]) extends CobolProcessor with Serializable { + override def process(inputStream: SimpleStream, + outputStream: OutputStream) + (rawRecordProcessor: RawRecordProcessor): Long = { + val recordExtractor = getRecordExtractor(readerParameters, inputStream) + + val dataStream = inputStream.copyStream() + try { + StreamProcessor.processStream(copybook, + options, + dataStream, + recordExtractor, + rawRecordProcessor, + outputStream) + } finally { + dataStream.close() + } + } + + private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = { + val dataStream = inputStream.copyStream() + val headerStream = inputStream.copyStream() + + val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler) + + reader.recordExtractor(0, dataStream, headerStream) match { + case Some(extractor) => extractor + case None => + throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " + + "Please check the copybook and the reader parameters." + ) + } + } +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala index 10d0e170e..488bef057 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala @@ -17,7 +17,7 @@ package za.co.absa.cobrix.cobol.processor.impl import za.co.absa.cobrix.cobol.parser.Copybook -import za.co.absa.cobrix.cobol.processor.RawRecordProcessor +import za.co.absa.cobrix.cobol.processor.{CobolProcessorContext, RawRecordProcessor} import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor import za.co.absa.cobrix.cobol.reader.stream.SimpleStream @@ -48,7 +48,9 @@ object StreamProcessor { val record = recordExtractor.next() val recordSize = record.length - val updatedRecord = recordProcessor.processRecord(copybook, options, record, recordExtractor.offset) + val ctx = CobolProcessorContext(copybook, options, recordExtractor.offset) + + val updatedRecord = recordProcessor.processRecord(record, ctx) val headerSize = recordExtractor.offset - recordSize - inputStream.offset if (headerSize > 0) { diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala new file mode 100644 index 000000000..721859472 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/base/BinaryFileFixture.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.base + +import java.nio.file.{Files, Path, Paths} +import java.util.Comparator +import java.util.function.Consumer + +/** + * This fixture adds ability for a unit test to create temporary files for using them in the tests. + */ +trait BinaryFileFixture { + def withTempDirectory(prefix: String)(f: String => Unit): Unit = { + val tmpPath = Files.createTempDirectory(prefix) + val pathStr = tmpPath.toAbsolutePath.toString + + f(pathStr) + + Files.walk(tmpPath) + .sorted(Comparator.reverseOrder()) + .forEach(new Consumer[Path] { + override def accept(f: Path): Unit = Files.delete(f) + }) + } + + def writeBinaryFile(filePath: String, content: Array[Byte]): Unit = { + Files.write(Paths.get(filePath), content) + } + + def readBinaryFile(filePath: String): Array[Byte] = { + Files.readAllBytes(Paths.get(filePath)) + } + + private def hex2bytes(hex: String): Array[Byte] = { + val compactStr = hex.replaceAll("\\s", "") + compactStr.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte) + } +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala index 7ec9d08ba..c6ec13724 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala @@ -17,27 +17,29 @@ package za.co.absa.cobrix.cobol.processor import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.base.BinaryFileFixture import za.co.absa.cobrix.cobol.mock.ByteStreamMock -import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat -import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, TextFullRecordExtractor} import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import java.io.ByteArrayOutputStream +import java.nio.file.Paths -class CobolProcessorBuilderSuite extends AnyWordSpec { +class CobolProcessorBuilderSuite extends AnyWordSpec with BinaryFileFixture { private val copybook = """ 01 RECORD. | 05 T PIC X. |""".stripMargin + "process" should { "process an input data stream into an output stream" in { val is = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) val os = new ByteArrayOutputStream(10) - val builder = CobolProcessor.builder(copybook) + val builder = CobolProcessor.builder + .withCopybookContents(copybook) val processor = new RawRecordProcessor { - override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = { + override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = { record.map(v => (v - 1).toByte) } } @@ -54,9 +56,39 @@ class CobolProcessorBuilderSuite extends AnyWordSpec { } } + "load and save" should { + "process files as expected" in { + withTempDirectory("cobol_processor") { tempDir => + val inputFile = Paths.get(tempDir, "input.dat").toString + val outputFile = Paths.get(tempDir, "output.dat").toString + + writeBinaryFile(inputFile, Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) + + val count = CobolProcessor.builder + .withCopybookContents(copybook) + .withRecordProcessor(new RawRecordProcessor { + override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = { + record.map(v => (v - 1).toByte) + } + }) + .load(inputFile) + .save(outputFile) + + val outputArray = readBinaryFile(outputFile) + + assert(count == 4) + assert(outputArray.head == -16) + assert(outputArray(1) == -15) + assert(outputArray(2) == -14) + assert(outputArray(3) == -13) + } + } + } + "getCobolSchema" should { "return the schema of the copybook provided" in { - val builder = CobolProcessor.builder(copybook) + val builder = CobolProcessor.builder + .withCopybookContents(copybook) val cobolSchema = builder.getCobolSchema(ReaderParameters()) @@ -66,7 +98,8 @@ class CobolProcessorBuilderSuite extends AnyWordSpec { "getReaderParameters" should { "return a reader according to passed options" in { - val builder = CobolProcessor.builder(copybook) + val builder = CobolProcessor.builder + .withCopybookContents(copybook) .option("record_format", "D") assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText) @@ -74,47 +107,4 @@ class CobolProcessorBuilderSuite extends AnyWordSpec { assert(builder.getOptions.contains("record_format")) } } - - "getRecordExtractor" should { - "work for an fixed-record-length files" in { - val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) - val builder = CobolProcessor.builder(copybook) - - val ext = builder.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), stream) - - assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor]) - - assert(ext.hasNext) - assert(ext.next().sameElements(Array(0xF1, 0xF2).map(_.toByte))) - assert(ext.next().sameElements(Array(0xF3, 0xF4).map(_.toByte))) - assert(!ext.hasNext) - } - - "work for an variable-record-length files" in { - val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) - val builder = CobolProcessor.builder(copybook) - - val ext = builder.getRecordExtractor(ReaderParameters( - recordFormat = RecordFormat.VariableLength, - isText = true - ), stream) - - assert(ext.isInstanceOf[TextFullRecordExtractor]) - } - - "throw an exception on a non-supported record format for processing" in { - val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) - val builder = CobolProcessor.builder(copybook) - - val ex = intercept[IllegalArgumentException] { - builder.getRecordExtractor(ReaderParameters( - recordFormat = RecordFormat.VariableLength, - isRecordSequence = true - ), stream) - } - - assert(ex.getMessage.contains("Cannot create a record extractor for the given reader parameters.")) - } - } - } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala new file mode 100644 index 000000000..93408c63d --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala @@ -0,0 +1,79 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.processor.impl + +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.mock.ByteStreamMock +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat +import za.co.absa.cobrix.cobol.processor.CobolProcessor +import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, TextFullRecordExtractor} +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters + +class CobolProcessorImplSuite extends AnyWordSpec { + private val copybook = + """ 01 RECORD. + | 05 T PIC X. + |""".stripMargin + + "getRecordExtractor" should { + "work for an fixed-record-length files" in { + val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) + val processor = CobolProcessor.builder + .withCopybookContents(copybook) + .build().asInstanceOf[CobolProcessorImpl] + + val ext = processor.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), stream) + + assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor]) + + assert(ext.hasNext) + assert(ext.next().sameElements(Array(0xF1, 0xF2).map(_.toByte))) + assert(ext.next().sameElements(Array(0xF3, 0xF4).map(_.toByte))) + assert(!ext.hasNext) + } + + "work for an variable-record-length files" in { + val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) + val processor = CobolProcessor.builder + .withCopybookContents(copybook) + .build().asInstanceOf[CobolProcessorImpl] + + val ext = processor.getRecordExtractor(ReaderParameters( + recordFormat = RecordFormat.VariableLength, + isText = true + ), stream) + + assert(ext.isInstanceOf[TextFullRecordExtractor]) + } + + "throw an exception on a non-supported record format for processing" in { + val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)) + val processor = CobolProcessor.builder + .withCopybookContents(copybook) + .build().asInstanceOf[CobolProcessorImpl] + + val ex = intercept[IllegalArgumentException] { + processor.getRecordExtractor(ReaderParameters( + recordFormat = RecordFormat.VariableLength, + isRecordSequence = true + ), stream) + } + + assert(ex.getMessage.contains("Cannot create a record extractor for the given reader parameters.")) + } + } +} diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala new file mode 100644 index 000000000..b98967d1c --- /dev/null +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala @@ -0,0 +1,244 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol + +import org.apache.hadoop.fs.Path +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.slf4j.LoggerFactory +import za.co.absa.cobrix.cobol.processor.{CobolProcessor, SerializableRawRecordProcessor} +import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration +import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer +import za.co.absa.cobrix.spark.cobol.utils.FileUtils + +import java.io.BufferedOutputStream +import java.util.concurrent.{ExecutorService, Executors} +import scala.collection.mutable +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + +trait SparkCobolProcessor { + /** + * Runs raw record processing and returns the record count processed. + * + * @param listOfFiles A sequence of input file paths to process. + * @param outputPath The path where processed records will be written. + * @return The number of records that were processed + */ + def process(listOfFiles: Seq[String], outputPath: String): Long +} + +object SparkCobolProcessor { + private val log = LoggerFactory.getLogger(this.getClass) + + class SparkCobolProcessorBuilder(implicit spark: SparkSession) { + private val caseInsensitiveOptions = new mutable.HashMap[String, String]() + private var copybookContentsOpt: Option[String] = None + private var rawRecordProcessorOpt: Option[SerializableRawRecordProcessor] = None + private var numberOfThreads: Int = 1 + + def load(path: String): SparkCobolProcessorLoader = { + val filePaths = FileUtils + .getFiles(path, spark.sparkContext.hadoopConfiguration) + + load(filePaths) + } + + def load(filePaths: Seq[String]): SparkCobolProcessorLoader = { + if (copybookContentsOpt.isEmpty) { + throw new IllegalArgumentException("Copybook contents must be provided.") + } + + if (rawRecordProcessorOpt.isEmpty) { + throw new IllegalArgumentException("A RawRecordProcessor must be provided.") + } + + if (numberOfThreads < 1) { + throw new IllegalArgumentException("Number of threads must be at least 1.") + } + + if (filePaths.isEmpty) { + throw new IllegalArgumentException("At least one input file must be provided.") + } + + new SparkCobolProcessorLoader(filePaths, copybookContentsOpt.get, rawRecordProcessorOpt.get, numberOfThreads, caseInsensitiveOptions.toMap) + } + + def withCopybookContents(copybookContents: String): SparkCobolProcessorBuilder = { + copybookContentsOpt = Option(copybookContents) + this + } + + def withRecordProcessor(processor: SerializableRawRecordProcessor): SparkCobolProcessorBuilder = { + rawRecordProcessorOpt = Option(processor) + this + } + + def withMultithreaded(numberOfThreads: Int): SparkCobolProcessorBuilder = { + if (numberOfThreads < 1) { + throw new IllegalArgumentException("Number of threads must be at least 1.") + } + this.numberOfThreads = numberOfThreads + this + } + + /** + * Adds a single option to the builder. + * + * @param key the option key. + * @param value the option value. + * @return this builder instance for method chaining. + */ + def option(key: String, value: String): SparkCobolProcessorBuilder = { + require(key.trim.nonEmpty, "Option key must not be empty or whitespace-only") + caseInsensitiveOptions += (key.trim.toLowerCase -> value) + this + } + + /** + * Adds multiple options to the builder. + * + * @param options a map of option key-value pairs. + * @return this builder instance for method chaining. + */ + def options(options: Map[String, String]): SparkCobolProcessorBuilder = { + caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2)) + this + } + } + + class SparkCobolProcessorLoader(filesToRead: Seq[String], + copybookContents: String, + rawRecordProcessor: SerializableRawRecordProcessor, + numberOfThreads: Int, + options: Map[String, String]) + (implicit spark: SparkSession) { + def save(outputPath: String): Long = { + val cobolProcessor = CobolProcessor.builder + .withCopybookContents(copybookContents) + .options(options) + .build() + + val processor = new SparkCobolProcessor { + private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration) + + override def process(listOfFiles: Seq[String], outputPath: String): Long = { + getFileProcessorRdd(listOfFiles, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads) + .reduce(_ + _) + } + } + + log.info(s"Writing to $outputPath...") + processor.process(filesToRead, outputPath) + } + } + + def builder(implicit spark: SparkSession): SparkCobolProcessorBuilder = { + new SparkCobolProcessorBuilder + } + + private def getFileProcessorRdd(listOfFiles: Seq[String], + outputPath: String, + copybookContents: String, + cobolProcessor: CobolProcessor, + rawRecordProcessor: SerializableRawRecordProcessor, + sconf: SerializableConfiguration, + numberOfThreads: Int + )(implicit spark: SparkSession): RDD[Long] = { + val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq + val rdd = spark.sparkContext.parallelize(groupedFiles) + rdd.map(group => { + processListOfFiles(group, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads) + }) + } + + private def processListOfFiles(listOfFiles: Seq[String], + outputPath: String, + copybookContents: String, + cobolProcessor: CobolProcessor, + rawRecordProcessor: SerializableRawRecordProcessor, + sconf: SerializableConfiguration, + numberOfThreads: Int + ): Long = { + val threadPool: ExecutorService = Executors.newFixedThreadPool(numberOfThreads) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(threadPool) + + val futures = listOfFiles.map { inputFIle => + val fileName = new Path(inputFIle).getName + val outputPathFileName = new Path(outputPath, fileName).toString + + log.info(s"Processing file: $inputFIle -> $outputPathFileName") + + Future { + val hadoopConfig = sconf.value + val inputFs = new Path(inputFIle).getFileSystem(hadoopConfig) + val ifs = new FileStreamer(inputFIle, inputFs) + val outputFile = new Path(outputPath, fileName) + val outputFs = outputFile.getFileSystem(hadoopConfig) + val ofs = new BufferedOutputStream(outputFs.create(outputFile, true)) + + var originalException: Throwable = null + + val recordCount = try { + cobolProcessor.process(ifs, ofs)(rawRecordProcessor) + } catch { + case ex: Throwable => + originalException = ex + 0L + } finally { + // Ugly code to ensure no exceptions escape unnoticed. + try { + ifs.close() + } catch { + case e: Throwable => + if (originalException != null) { + originalException.addSuppressed(e) + } else { + originalException = e + } + } + + try { + ofs.close() + } catch { + case e: Throwable => + if (originalException != null) { + originalException.addSuppressed(e) + } else { + originalException = e + } + } + } + + if (originalException != null) throw originalException + + log.info(s"Writing to $outputFile succeeded!") + recordCount + } + }.toSeq + + val seq = Future.sequence(futures) + + val recordCuntProcessed = try { + Await.result(seq, Duration.Inf).sum + } finally { + threadPool.shutdown() + } + + recordCuntProcessed + } +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala new file mode 100644 index 000000000..1028fa016 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala @@ -0,0 +1,108 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol + +import org.apache.hadoop.fs.Path +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.processor.{CobolProcessorContext, SerializableRawRecordProcessor} +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture} + +class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { + private val copybook = + """ 01 RECORD. + | 05 T PIC X. + |""".stripMargin + + private val rawRecordProcessor = new SerializableRawRecordProcessor { + override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = { + record.map(v => (v - 1).toByte) + } + } + + "SparkCobolProcessor" should { + "fail to create when a copybook is not specified" in { + val exception = intercept[IllegalArgumentException] { + SparkCobolProcessor.builder.load(".") + } + + assert(exception.getMessage.contains("Copybook contents must be provided.")) + } + + "fail to create when a record processor is not provided" in { + val exception = intercept[IllegalArgumentException] { + SparkCobolProcessor.builder + .withCopybookContents(copybook).load(".") + } + + assert(exception.getMessage.contains("A RawRecordProcessor must be provided.")) + } + + "fail to create when the number of threads is less than 0" in { + val exception = intercept[IllegalArgumentException] { + SparkCobolProcessor.builder + .withCopybookContents(copybook) + .withRecordProcessor(rawRecordProcessor) + .withMultithreaded(0) + .load("") + } + + assert(exception.getMessage.contains("Number of threads must be at least 1.")) + } + + "fail when no files are provided" in { + val exception = intercept[IllegalArgumentException] { + SparkCobolProcessor.builder + .withCopybookContents(copybook) + .withRecordProcessor(rawRecordProcessor) + .load(Seq.empty) + } + + assert(exception.getMessage.contains("At least one input file must be provided.")) + } + + "process files via an RDD" in { + withTempDirectory("spark_cobol_processor") { tempDir => + val binData = Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte) + + val inputPath = new Path(tempDir, "input.dat").toString + val outputPath = new Path(tempDir, "output").toString + val outputFile = new Path(outputPath, "input.dat").toString + + writeBinaryFile(inputPath, binData) + + SparkCobolProcessor.builder + .withCopybookContents(copybook) + .withRecordProcessor (new SerializableRawRecordProcessor { + override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = { + record.map(v => (v - 1).toByte) + } + }) + .load(inputPath) + .save(outputPath) + + val outputData = readBinaryFile(outputFile) + + assert(outputData.length == binData.length) + assert(outputData.head == 0xF0.toByte) + assert(outputData(1) == 0xF1.toByte) + assert(outputData(2) == 0xF2.toByte) + assert(outputData(3) == 0xF3.toByte) + } + } + } +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala index b84543895..2e47ed81e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala @@ -21,7 +21,7 @@ import org.apache.commons.io.FileUtils import java.io.File.createTempFile import java.io.{DataOutputStream, File, FileOutputStream} import java.nio.charset.Charset -import java.nio.file.Files +import java.nio.file.{Files, Paths} /** * This fixture adds ability for a unit test to create temporary files for using them in the tests. @@ -113,6 +113,14 @@ trait BinaryFileFixture { tempFile } + def writeBinaryFile(filePath: String, content: Array[Byte]): Unit = { + Files.write(Paths.get(filePath), content) + } + + def readBinaryFile(filePath: String): Array[Byte] = { + FileUtils.readFileToByteArray(new File(filePath)) + } + private def hex2bytes(hex: String): Array[Byte] = { val compactStr = hex.replaceAll("\\s", "") compactStr.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)