Skip to content

Commit 2cad3de

Browse files
committed
#769 Fix the processor not processing the last record, add unit tests.
1 parent a0f4b2a commit 2cad3de

File tree

4 files changed

+194
-10
lines changed

4 files changed

+194
-10
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RecordProcessorBuilder.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616

1717
package za.co.absa.cobrix.cobol.processor
1818

19+
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength
1920
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
20-
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
21+
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, RawRecordContext, RawRecordExtractor}
2122
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
2223
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2324
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
@@ -93,22 +94,35 @@ class RecordProcessorBuilder(copybookContents: String) {
9394
}
9495
}
9596

96-
private def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = {
97+
private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = {
9798
CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters)
9899
}
99100

100-
private def getReaderParameters: ReaderParameters = {
101+
private[processor] def getReaderParameters: ReaderParameters = {
101102
val cobolParameters = CobolParametersParser.parse(new Parameters(caseInsensitiveOptions.toMap))
102103

103104
CobolParametersParser.getReaderProperties(cobolParameters, None)
104105
}
105106

106-
private def getRecordExtractor[T: ClassTag](readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
107+
private[processor] def getRecordExtractor[T: ClassTag](readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
107108
val dataStream = inputStream.copyStream()
108109
val headerStream = inputStream.copyStream()
109110

110111
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
111112

112-
reader.recordExtractor(0, dataStream, headerStream).get
113+
reader.recordExtractor(0, dataStream, headerStream) match {
114+
case Some(extractor) => extractor
115+
case None if readerParameters.recordFormat == FixedLength =>
116+
val dataStream = inputStream.copyStream()
117+
val headerStream = inputStream.copyStream()
118+
val ctx = RawRecordContext(0, dataStream, headerStream, getCobolSchema(readerParameters).copybook, null, null, "")
119+
new FixedRecordLengthRawRecordExtractor(ctx, readerParameters.recordLength)
120+
case None =>
121+
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
122+
"Please check the copybook and the reader parameters."
123+
)
124+
}
113125
}
126+
127+
private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap
114128
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/StreamProcessor.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,7 @@ object StreamProcessor {
4545
val record = recordExtractor.next()
4646
val recordSize = record.length
4747

48-
val updatedRecord = if (recordExtractor.hasNext) {
49-
recordProcessor.processRecord(copybook, options, record, recordExtractor.offset)
50-
} else {
51-
record
52-
}
48+
val updatedRecord = recordProcessor.processRecord(copybook, options, record, recordExtractor.offset)
5349

5450
val headerSize = recordExtractor.offset - recordSize - inputStream.offset
5551
if (headerSize > 0) {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.reader.extractors.raw
18+
19+
class FixedRecordLengthRawRecordExtractor(ctx: RawRecordContext, fixedRecordLength: Option[Int]) extends Serializable with RawRecordExtractor {
20+
private var byteOffset: Long = ctx.inputStream.offset
21+
private val recordSize = fixedRecordLength.getOrElse(ctx.copybook.getRecordSize)
22+
private var currentRecordOpt: Option[Array[Byte]] = None
23+
24+
ctx.headerStream.close()
25+
26+
override def offset: Long = byteOffset
27+
28+
override def hasNext: Boolean = {
29+
if (currentRecordOpt.isEmpty) {
30+
readNextRecord()
31+
}
32+
currentRecordOpt.nonEmpty
33+
}
34+
35+
private def readNextRecord(): Unit = {
36+
if (!ctx.inputStream.isEndOfStream) {
37+
val nextRecord = ctx.inputStream.next(recordSize)
38+
39+
if (nextRecord.length > 0) {
40+
currentRecordOpt = Some(nextRecord)
41+
}
42+
}
43+
}
44+
45+
46+
@throws[NoSuchElementException]
47+
override def next(): Array[Byte] = {
48+
if (!hasNext) {
49+
throw new NoSuchElementException
50+
}
51+
val record = currentRecordOpt.get
52+
byteOffset += record.length
53+
currentRecordOpt = None
54+
record
55+
}
56+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import org.scalatest.wordspec.AnyWordSpec
20+
import za.co.absa.cobrix.cobol.mock.ByteStreamMock
21+
import za.co.absa.cobrix.cobol.parser.Copybook
22+
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
23+
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, TextFullRecordExtractor}
24+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
25+
26+
import java.io.ByteArrayOutputStream
27+
28+
class RecordProcessorBuilderSuite extends AnyWordSpec {
29+
private val copybook =
30+
""" 01 RECORD.
31+
| 05 T PIC X.
32+
|""".stripMargin
33+
"process" should {
34+
"process an input data stream into an output stream" in {
35+
val is = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
36+
val os = new ByteArrayOutputStream(10)
37+
val builder = RecordProcessorBuilder.copybookContents(copybook)
38+
39+
val processor = new RawRecordProcessor {
40+
override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
41+
record.map(v => (v - 1).toByte)
42+
}
43+
}
44+
45+
builder.process(is, os)(processor)
46+
47+
val outputArray = os.toByteArray
48+
49+
assert(outputArray.head == -16)
50+
assert(outputArray(1) == -15)
51+
assert(outputArray(2) == -14)
52+
assert(outputArray(3) == -13)
53+
}
54+
}
55+
56+
"getCobolSchema" should {
57+
"return the schema of the copybook provided" in {
58+
val builder = RecordProcessorBuilder.copybookContents(copybook)
59+
60+
val cobolSchema = builder.getCobolSchema(ReaderParameters())
61+
62+
assert(cobolSchema.copybook.ast.children.length == 1)
63+
}
64+
}
65+
66+
"getReaderParameters" should {
67+
"return a reader according to passed options" in {
68+
val builder = RecordProcessorBuilder.copybookContents(copybook)
69+
.option("record_format", "D")
70+
71+
assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText)
72+
assert(builder.getOptions.contains("record_format"))
73+
}
74+
}
75+
76+
"getRecordExtractor" should {
77+
"work for an fixed-record-length files" in {
78+
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
79+
val builder = RecordProcessorBuilder.copybookContents(copybook)
80+
81+
val ext = builder.getRecordExtractor(ReaderParameters(recordLength = Some(2)), stream)
82+
83+
assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor])
84+
85+
assert(ext.hasNext)
86+
assert(ext.next().sameElements(Array(0xF1, 0xF2).map(_.toByte)))
87+
assert(ext.next().sameElements(Array(0xF3, 0xF4).map(_.toByte)))
88+
assert(!ext.hasNext)
89+
}
90+
91+
"work for an variable-record-length files" in {
92+
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
93+
val builder = RecordProcessorBuilder.copybookContents(copybook)
94+
95+
val ext = builder.getRecordExtractor(ReaderParameters(
96+
recordFormat = RecordFormat.VariableLength,
97+
isText = true
98+
), stream)
99+
100+
assert(ext.isInstanceOf[TextFullRecordExtractor])
101+
}
102+
103+
"throw an exception on a non-supported record format for processing" in {
104+
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
105+
val builder = RecordProcessorBuilder.copybookContents(copybook)
106+
107+
val ex = intercept[IllegalArgumentException] {
108+
builder.getRecordExtractor(ReaderParameters(
109+
recordFormat = RecordFormat.VariableLength,
110+
isRecordSequence = true
111+
), stream)
112+
}
113+
114+
assert(ex.getMessage.contains("Cannot create a record extractor for the given reader parameters."))
115+
}
116+
}
117+
118+
}

0 commit comments

Comments
 (0)