Skip to content

Commit da45372

Browse files
committed
#795 Implement the CobolProcessor that allows changing record sizes.
1 parent 4b427f6 commit da45372

File tree

11 files changed

+369
-42
lines changed

11 files changed

+369
-42
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
trait CobolProcessingStrategy
20+
21+
object CobolProcessingStrategy {
22+
case object InPlace extends CobolProcessingStrategy
23+
case object ToVariableLength extends CobolProcessingStrategy
24+
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package za.co.absa.cobrix.cobol.processor
1818

1919
import za.co.absa.cobrix.cobol.parser.Copybook
20-
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorImpl
20+
import za.co.absa.cobrix.cobol.processor.impl.{CobolProcessorInPlace, CobolProcessorToRdw}
2121
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
2222
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2323
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}
@@ -50,6 +50,7 @@ object CobolProcessor {
5050
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()
5151
private var copybookContentsOpt: Option[String] = None
5252
private var rawRecordProcessorOpt: Option[RawRecordProcessor] = None
53+
private var cobolProcessingStrategy: CobolProcessingStrategy = CobolProcessingStrategy.InPlace
5354

5455
def build(): CobolProcessor = {
5556
if (copybookContentsOpt.isEmpty) {
@@ -59,7 +60,10 @@ object CobolProcessor {
5960
val readerParameters = getReaderParameters
6061
val cobolSchema = getCobolSchema(readerParameters)
6162

62-
new CobolProcessorImpl(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
63+
cobolProcessingStrategy match {
64+
case CobolProcessingStrategy.InPlace => new CobolProcessorInPlace(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
65+
case CobolProcessingStrategy.ToVariableLength => new CobolProcessorToRdw(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
66+
}
6367
}
6468

6569
def load(path: String): CobolProcessorLoader = {
@@ -87,7 +91,7 @@ object CobolProcessor {
8791
val readerParameters = getReaderParameters
8892
val cobolSchema = getCobolSchema(readerParameters)
8993

90-
new CobolProcessorLoader(path, copybookContentsOpt.get, cobolSchema.copybook, rawRecordProcessorOpt.get, readerParameters, caseInsensitiveOptions.toMap)
94+
new CobolProcessorLoader(path, copybookContentsOpt.get, cobolSchema.copybook, rawRecordProcessorOpt.get, readerParameters, cobolProcessingStrategy, caseInsensitiveOptions.toMap)
9195
}
9296

9397
def withCopybookContents(copybookContents: String): CobolProcessorBuilder = {
@@ -100,6 +104,11 @@ object CobolProcessor {
100104
this
101105
}
102106

107+
def withProcessingStrategy(strategy: CobolProcessingStrategy): CobolProcessorBuilder = {
108+
cobolProcessingStrategy = strategy
109+
this
110+
}
111+
103112
/**
104113
* Adds a single option to the builder.
105114
*
@@ -142,9 +151,13 @@ object CobolProcessor {
142151
copybook: Copybook,
143152
rawRecordProcessor: RawRecordProcessor,
144153
readerParameters: ReaderParameters,
154+
cobolProcessingStrategy: CobolProcessingStrategy,
145155
options: Map[String, String]) {
146156
def save(outputFile: String): Long = {
147-
val processor = new CobolProcessorImpl(readerParameters, copybook, copybookContents, options)
157+
val processor = cobolProcessingStrategy match {
158+
case CobolProcessingStrategy.InPlace => new CobolProcessorInPlace(readerParameters, copybook, copybookContents, options)
159+
case CobolProcessingStrategy.ToVariableLength => new CobolProcessorToRdw(readerParameters, copybook, copybookContents, options)
160+
}
148161

149162
val ifs = new FSStream(fileToProcess)
150163
val ofs = new BufferedOutputStream(new FileOutputStream(outputFile))
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor.impl
18+
19+
import za.co.absa.cobrix.cobol.processor.CobolProcessor
20+
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
21+
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
22+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
23+
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
24+
25+
/**
26+
* Implements common methods of direct EBCDIC to EBCDIC data processor implementations.
27+
*
28+
* The processing can be done from inside an RDD so this is why it is serializable.
29+
*/
30+
abstract class CobolProcessorBase extends CobolProcessor with Serializable {
31+
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, copybookContents: String, inputStream: SimpleStream): RawRecordExtractor = {
32+
val dataStream = inputStream.copyStream()
33+
val headerStream = inputStream.copyStream()
34+
35+
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
36+
37+
reader.recordExtractor(0, dataStream, headerStream) match {
38+
case Some(extractor) => extractor
39+
case None =>
40+
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
41+
"Please check the copybook and the reader parameters."
42+
)
43+
}
44+
}
45+
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImpl.scala renamed to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ import java.io.OutputStream
2727

2828
/**
2929
* Implementation of the CobolProcessor trait, responsible for processing COBOL data streams
30-
* by extracting records and applying a user-defined raw record processor.
31-
*
32-
* The processing can be done from inside an RDD so this is why it is serializable.
30+
* by extracting records and applying a user-defined raw record processor. This processor
31+
* retains the original COBOL data format in the output.
3332
*
3433
* Please, do not use this class directly. Use `CobolProcessor.builder()` instead.
3534
*
@@ -38,18 +37,18 @@ import java.io.OutputStream
3837
* @param copybookContents The raw textual representation of the copybook.
3938
* @param options A map of processing options to customize the behavior of the processor (same as for `spark-cobol`).
4039
*/
41-
class CobolProcessorImpl(readerParameters: ReaderParameters,
42-
copybook: Copybook,
43-
copybookContents: String,
44-
options: Map[String, String]) extends CobolProcessor with Serializable {
40+
class CobolProcessorInPlace(readerParameters: ReaderParameters,
41+
copybook: Copybook,
42+
copybookContents: String,
43+
options: Map[String, String]) extends CobolProcessorBase {
4544
override def process(inputStream: SimpleStream,
4645
outputStream: OutputStream)
4746
(rawRecordProcessor: RawRecordProcessor): Long = {
48-
val recordExtractor = getRecordExtractor(readerParameters, inputStream)
47+
val recordExtractor = getRecordExtractor(readerParameters, copybookContents, inputStream)
4948

5049
val dataStream = inputStream.copyStream()
5150
try {
52-
StreamProcessor.processStream(copybook,
51+
StreamProcessor.processStreamInPlace(copybook,
5352
options,
5453
dataStream,
5554
recordExtractor,
@@ -60,18 +59,5 @@ class CobolProcessorImpl(readerParameters: ReaderParameters,
6059
}
6160
}
6261

63-
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
64-
val dataStream = inputStream.copyStream()
65-
val headerStream = inputStream.copyStream()
66-
67-
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
6862

69-
reader.recordExtractor(0, dataStream, headerStream) match {
70-
case Some(extractor) => extractor
71-
case None =>
72-
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
73-
"Please check the copybook and the reader parameters."
74-
)
75-
}
76-
}
7763
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor.impl
18+
19+
import za.co.absa.cobrix.cobol.parser.Copybook
20+
import za.co.absa.cobrix.cobol.processor.RawRecordProcessor
21+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
22+
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
23+
24+
import java.io.OutputStream
25+
26+
/**
27+
* Implementation of the CobolProcessor trait, responsible for processing COBOL data streams
28+
* by extracting records and applying a user-defined raw record processor. This processor
29+
* converts the input format to the variable record length format with little-endian RDW records.
30+
*
31+
* Please, do not use this class directly. Use `CobolProcessor.builder()` instead.
32+
*
33+
* @param readerParameters Configuration for record extraction and COBOL file parsing.
34+
* @param copybook The copybook definition used for interpreting COBOL data structures.
35+
* @param copybookContents The raw textual representation of the copybook.
36+
* @param options A map of processing options to customize the behavior of the processor (same as for `spark-cobol`).
37+
*/
38+
class CobolProcessorToRdw(readerParameters: ReaderParameters,
39+
copybook: Copybook,
40+
copybookContents: String,
41+
options: Map[String, String]) extends CobolProcessorBase {
42+
override def process(inputStream: SimpleStream,
43+
outputStream: OutputStream)
44+
(rawRecordProcessor: RawRecordProcessor): Long = {
45+
val recordExtractor = getRecordExtractor(readerParameters, copybookContents, inputStream)
46+
47+
StreamProcessor.processStreamToRdw(copybook,
48+
options,
49+
recordExtractor,
50+
rawRecordProcessor,
51+
outputStream)
52+
}
53+
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ object StreamProcessor {
3636
* @param outputStream the output stream where the processed records will be written.
3737
* @return The number of records processed.
3838
*/
39-
def processStream(copybook: Copybook,
40-
options: Map[String, String],
41-
inputStream: SimpleStream,
42-
recordExtractor: RawRecordExtractor,
43-
recordProcessor: RawRecordProcessor,
44-
outputStream: OutputStream): Long = {
39+
def processStreamInPlace(copybook: Copybook,
40+
options: Map[String, String],
41+
inputStream: SimpleStream,
42+
recordExtractor: RawRecordExtractor,
43+
recordProcessor: RawRecordProcessor,
44+
outputStream: OutputStream): Long = {
4545
var recordCount = 0L
4646
while (recordExtractor.hasNext) {
4747
recordCount += 1
@@ -68,4 +68,41 @@ object StreamProcessor {
6868
}
6969
recordCount
7070
}
71+
72+
/**
73+
* Processes a stream of COBOL raw records and writes it back as a variable length format with little-endian RDW headers.
74+
*
75+
* @param copybook the COBOL copybook that describes the schema of the records.
76+
* @param options arbitrary options used for splitting input data into records (same as 'spark-cobol' options).
77+
* Keys are lower-cased for case-insensitive handling. Can contain custom options as well.
78+
* @param recordExtractor the extractor that extracts raw records from the input stream.
79+
* @param recordProcessor the per-record processing logic implementation.
80+
* @param outputStream the output stream where the processed records will be written.
81+
* @return The number of records processed.
82+
*/
83+
def processStreamToRdw(copybook: Copybook,
84+
options: Map[String, String],
85+
recordExtractor: RawRecordExtractor,
86+
recordProcessor: RawRecordProcessor,
87+
outputStream: OutputStream): Long = {
88+
var recordCount = 0L
89+
90+
while (recordExtractor.hasNext) {
91+
recordCount += 1
92+
val record = recordExtractor.next()
93+
val recordSize = record.length
94+
95+
val ctx = CobolProcessorContext(copybook, options, recordExtractor.offset)
96+
97+
val updatedRecord = recordProcessor.processRecord(record, ctx)
98+
99+
val rdw = Array[Byte](0, 0, ((updatedRecord.length) & 0xFF).toByte, (((updatedRecord.length) >> 8) & 0xFF).toByte)
100+
101+
outputStream.write(rdw)
102+
outputStream.write(updatedRecord)
103+
}
104+
105+
recordCount
106+
}
107+
71108
}

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala renamed to cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBaseSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import za.co.absa.cobrix.cobol.processor.CobolProcessor
2323
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, TextFullRecordExtractor}
2424
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2525

26-
class CobolProcessorImplSuite extends AnyWordSpec {
26+
class CobolProcessorBaseSuite extends AnyWordSpec {
2727
private val copybook =
2828
""" 01 RECORD.
2929
| 05 T PIC X.
@@ -34,9 +34,9 @@ class CobolProcessorImplSuite extends AnyWordSpec {
3434
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
3535
val processor = CobolProcessor.builder
3636
.withCopybookContents(copybook)
37-
.build().asInstanceOf[CobolProcessorImpl]
37+
.build().asInstanceOf[CobolProcessorInPlace]
3838

39-
val ext = processor.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), stream)
39+
val ext = processor.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), copybook, stream)
4040

4141
assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor])
4242

@@ -50,12 +50,12 @@ class CobolProcessorImplSuite extends AnyWordSpec {
5050
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
5151
val processor = CobolProcessor.builder
5252
.withCopybookContents(copybook)
53-
.build().asInstanceOf[CobolProcessorImpl]
53+
.build().asInstanceOf[CobolProcessorInPlace]
5454

5555
val ext = processor.getRecordExtractor(ReaderParameters(
5656
recordFormat = RecordFormat.VariableLength,
5757
isText = true
58-
), stream)
58+
), copybook, stream)
5959

6060
assert(ext.isInstanceOf[TextFullRecordExtractor])
6161
}
@@ -64,13 +64,13 @@ class CobolProcessorImplSuite extends AnyWordSpec {
6464
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
6565
val processor = CobolProcessor.builder
6666
.withCopybookContents(copybook)
67-
.build().asInstanceOf[CobolProcessorImpl]
67+
.build().asInstanceOf[CobolProcessorInPlace]
6868

6969
val ex = intercept[IllegalArgumentException] {
7070
processor.getRecordExtractor(ReaderParameters(
7171
recordFormat = RecordFormat.VariableLength,
7272
isRecordSequence = true
73-
), stream)
73+
), copybook, stream)
7474
}
7575

7676
assert(ex.getMessage.contains("Cannot create a record extractor for the given reader parameters."))

0 commit comments

Comments
 (0)