Skip to content

Commit bc8c400

Browse files
committed
#780 Refactor the cobol processor ot use the proper builder pattern.
1 parent 140942e commit bc8c400

File tree

6 files changed

+147
-138
lines changed

6 files changed

+147
-138
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength
20+
import za.co.absa.cobrix.cobol.processor.impl.{ArrayOfAnyHandler, StreamProcessor}
21+
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
22+
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, RawRecordContext, RawRecordExtractor}
23+
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
24+
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
25+
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
26+
27+
import java.io.OutputStream
28+
import scala.collection.mutable
29+
30+
31+
/**
32+
* A trait that defines a processor for raw COBOL data streams.
33+
* It provides a method to process a COBOL file or a stream, provided record processor.
34+
*/
35+
trait CobolProcessor {
36+
/**
37+
* Processes the input stream of COBOL records and writes the output to the specified output stream.
38+
*
39+
* @param inputStream the input stream containing raw COBOL records.
40+
* @param outputStream the output stream where processed records will be written.
41+
* @param rawRecordProcessor the processor that processes each raw record.
42+
*/
43+
def process(inputStream: SimpleStream,
44+
outputStream: OutputStream)
45+
(rawRecordProcessor: RawRecordProcessor): Unit
46+
47+
}
48+
49+
object CobolProcessor {
50+
class CobolProcessorBuilder(copybookContents: String) {
51+
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()
52+
53+
def build(): CobolProcessor = {
54+
new CobolProcessor {
55+
override def process(inputStream: SimpleStream,
56+
outputStream: OutputStream)
57+
(rawRecordProcessor: RawRecordProcessor): Unit = {
58+
val readerParameters = getReaderParameters
59+
val cobolSchema = getCobolSchema(readerParameters)
60+
val recordExtractor = getRecordExtractor(readerParameters, inputStream)
61+
62+
val dataStream = inputStream.copyStream()
63+
try {
64+
StreamProcessor.processStream(cobolSchema.copybook,
65+
caseInsensitiveOptions.toMap,
66+
dataStream,
67+
recordExtractor,
68+
rawRecordProcessor,
69+
outputStream)
70+
} finally {
71+
dataStream.close()
72+
}
73+
}
74+
}
75+
}
76+
77+
/**
78+
* Adds a single option to the builder.
79+
*
80+
* @param key the option key.
81+
* @param value the option value.
82+
* @return this builder instance for method chaining.
83+
*/
84+
def option(key: String, value: String): CobolProcessorBuilder = {
85+
caseInsensitiveOptions += (key.toLowerCase -> value)
86+
this
87+
}
88+
89+
/**
90+
* Adds multiple options to the builder.
91+
*
92+
* @param options a map of option key-value pairs.
93+
* @return this builder instance for method chaining.
94+
*/
95+
def options(options: Map[String, String]): CobolProcessorBuilder = {
96+
caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
97+
this
98+
}
99+
100+
private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = {
101+
CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters)
102+
}
103+
104+
private[processor] def getReaderParameters: ReaderParameters = {
105+
val cobolParameters = CobolParametersParser.parse(new Parameters(caseInsensitiveOptions.toMap))
106+
107+
CobolParametersParser.getReaderProperties(cobolParameters, None)
108+
}
109+
110+
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
111+
val dataStream = inputStream.copyStream()
112+
val headerStream = inputStream.copyStream()
113+
114+
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
115+
116+
reader.recordExtractor(0, dataStream, headerStream) match {
117+
case Some(extractor) => extractor
118+
case None if readerParameters.recordFormat == FixedLength =>
119+
val dataStream = inputStream.copyStream()
120+
val ctx = RawRecordContext.builder(dataStream, getCobolSchema(readerParameters).copybook).build()
121+
new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
122+
case None =>
123+
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
124+
"Please check the copybook and the reader parameters."
125+
)
126+
}
127+
}
128+
129+
private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap
130+
}
131+
132+
def builder(copybookContent: String): CobolProcessorBuilder = {
133+
new CobolProcessorBuilder(copybookContent)
134+
}
135+
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala

Lines changed: 0 additions & 127 deletions
This file was deleted.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/ArrayOfAnyHandler.scala renamed to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/ArrayOfAnyHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.cobrix.cobol.processor
17+
package za.co.absa.cobrix.cobol.processor.impl
1818

1919
import za.co.absa.cobrix.cobol.parser.ast.Group
2020
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/MapOfAnyHandler.scala renamed to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/MapOfAnyHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.cobrix.cobol.processor
17+
package za.co.absa.cobrix.cobol.processor.impl
1818

1919
import za.co.absa.cobrix.cobol.parser.ast.Group
2020
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/StreamProcessor.scala renamed to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.cobrix.cobol.processor
17+
package za.co.absa.cobrix.cobol.processor.impl
1818

1919
import za.co.absa.cobrix.cobol.parser.Copybook
20+
import za.co.absa.cobrix.cobol.processor.RawRecordProcessor
2021
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
2122
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
2223

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilderSuite.scala renamed to cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2525

2626
import java.io.ByteArrayOutputStream
2727

28-
class RecordProcessorBuilderSuite extends AnyWordSpec {
28+
class CobolProcessorBuilderSuite extends AnyWordSpec {
2929
private val copybook =
3030
""" 01 RECORD.
3131
| 05 T PIC X.
@@ -34,15 +34,15 @@ class RecordProcessorBuilderSuite extends AnyWordSpec {
3434
"process an input data stream into an output stream" in {
3535
val is = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
3636
val os = new ByteArrayOutputStream(10)
37-
val builder = RecordProcessorBuilder.copybookContents(copybook)
37+
val builder = CobolProcessor.builder(copybook)
3838

3939
val processor = new RawRecordProcessor {
4040
override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
4141
record.map(v => (v - 1).toByte)
4242
}
4343
}
4444

45-
builder.process(is, os)(processor)
45+
builder.build().process(is, os)(processor)
4646

4747
val outputArray = os.toByteArray
4848

@@ -55,7 +55,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec {
5555

5656
"getCobolSchema" should {
5757
"return the schema of the copybook provided" in {
58-
val builder = RecordProcessorBuilder.copybookContents(copybook)
58+
val builder = CobolProcessor.builder(copybook)
5959

6060
val cobolSchema = builder.getCobolSchema(ReaderParameters())
6161

@@ -65,7 +65,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec {
6565

6666
"getReaderParameters" should {
6767
"return a reader according to passed options" in {
68-
val builder = RecordProcessorBuilder.copybookContents(copybook)
68+
val builder = CobolProcessor.builder(copybook)
6969
.option("record_format", "D")
7070

7171
assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText)
@@ -77,7 +77,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec {
7777
"getRecordExtractor" should {
7878
"work for an fixed-record-length files" in {
7979
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
80-
val builder = RecordProcessorBuilder.copybookContents(copybook)
80+
val builder = CobolProcessor.builder(copybook)
8181

8282
val ext = builder.getRecordExtractor(ReaderParameters(recordLength = Some(2)), stream)
8383

@@ -91,7 +91,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec {
9191

9292
"work for an variable-record-length files" in {
9393
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
94-
val builder = RecordProcessorBuilder.copybookContents(copybook)
94+
val builder = CobolProcessor.builder(copybook)
9595

9696
val ext = builder.getRecordExtractor(ReaderParameters(
9797
recordFormat = RecordFormat.VariableLength,
@@ -103,7 +103,7 @@ class RecordProcessorBuilderSuite extends AnyWordSpec {
103103

104104
"throw an exception on a non-supported record format for processing" in {
105105
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
106-
val builder = RecordProcessorBuilder.copybookContents(copybook)
106+
val builder = CobolProcessor.builder(copybook)
107107

108108
val ex = intercept[IllegalArgumentException] {
109109
builder.getRecordExtractor(ReaderParameters(

0 commit comments

Comments
 (0)