Skip to content

Commit a0f4b2a

Browse files
committed
#769 Add library methods to implement ebcdic to ebcdic file processor.
1 parent 2c068b7 commit a0f4b2a

File tree

11 files changed

+317
-4
lines changed

11 files changed

+317
-4
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import za.co.absa.cobrix.cobol.parser.ast.Group
20+
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
21+
22+
/**
23+
* A handler for processing COBOL records and mapping it to JVM data structures.
24+
*
25+
* This implementation uses an array to group data fields of struct fields.
26+
*/
27+
class ArrayOfAnyHandler extends RecordHandler[scala.Array[Any]] {
28+
override def create(values: Array[Any], group: Group): Array[Any] = values
29+
30+
override def toSeq(record: Array[Any]): Seq[Any] = record.toSeq
31+
32+
override def foreach(record: Array[Any])(f: Any => Unit): Unit = record.foreach(f)
33+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import za.co.absa.cobrix.cobol.parser.ast.Group
20+
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
21+
22+
/**
23+
* A handler for processing COBOL records and mapping it to JVM data structures.
24+
*
25+
* This implementation uses a map from a string field name to value to represent struct fields from data records.
26+
*/
27+
class MapOfAnyHandler extends RecordHandler[Map[String, Any]] {
28+
override def create(values: Array[Any], group: Group): Map[String, Any] = {
29+
(group.children zip values).map(t => t._1.name -> (t._2 match {
30+
case s: Array[Any] => s.toSeq
31+
case s => s
32+
})).toMap
33+
}
34+
35+
override def toSeq(record: Map[String, Any]): Seq[Any] = {
36+
record.values.toSeq
37+
}
38+
39+
override def foreach(record: Map[String, Any])(f: Any => Unit): Unit = record.values.foreach(f)
40+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import za.co.absa.cobrix.cobol.parser.Copybook
20+
21+
/**
22+
* A trait that defines a processor for raw COBOL records.
23+
* It provides a method to process a single COBOL record based on the provided copybook and options.
24+
*/
25+
trait RawRecordProcessor {
26+
def processRecord(copybook: Copybook,
27+
options: Map[String, String],
28+
record: Array[Byte],
29+
offset: Long): Array[Byte]
30+
31+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
20+
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
21+
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
22+
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
23+
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
24+
25+
import java.io.OutputStream
26+
import scala.collection.mutable
27+
import scala.reflect.ClassTag
28+
29+
object RecordProcessorBuilder {
30+
/**
31+
* Creates a new instance of the RecordProcessorBuilder with the given copybook content.
32+
* The instabce is used to create an COBOL data processor allowing applying changes to a mainrame file
33+
* without changing the original format.
34+
*
35+
* @param copybookContent the COBOL copybook content as a string.
36+
* @return a new RecordProcessorBuilder instance.
37+
*/
38+
def copybookContents(copybookContent: String): RecordProcessorBuilder = {
39+
new RecordProcessorBuilder(copybookContent)
40+
}
41+
}
42+
43+
class RecordProcessorBuilder(copybookContents: String) {
44+
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()
45+
46+
/**
47+
* Adds a single option to the builder.
48+
*
49+
* @param key the option key.
50+
* @param value the option value.
51+
* @return this builder instance for method chaining.
52+
*/
53+
def option(key: String, value: String): RecordProcessorBuilder = {
54+
caseInsensitiveOptions += (key.toLowerCase -> value)
55+
this
56+
}
57+
58+
/**
59+
* Adds multiple options to the builder.
60+
*
61+
* @param options a map of option key-value pairs.
62+
* @return this builder instance for method chaining.
63+
*/
64+
def options(options: Map[String, String]): RecordProcessorBuilder = {
65+
caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
66+
this
67+
}
68+
69+
/**
70+
* Processes the input stream of COBOL records and writes the output to the specified output stream.
71+
*
72+
* @param inputStream the input stream containing raw COBOL records.
73+
* @param outputStream the output stream where processed records will be written.
74+
* @param rawRecordProcessor the processor that processes each raw record.
75+
*/
76+
def process(inputStream: SimpleStream,
77+
outputStream: OutputStream)
78+
(rawRecordProcessor: RawRecordProcessor): Unit = {
79+
val readerParameters = getReaderParameters
80+
val cobolSchema = getCobolSchema(readerParameters)
81+
val recordExtractor = getRecordExtractor(readerParameters, inputStream)
82+
83+
val dataStream = inputStream.copyStream()
84+
try {
85+
StreamProcessor.processStream(cobolSchema.copybook,
86+
caseInsensitiveOptions.toMap,
87+
dataStream,
88+
recordExtractor,
89+
rawRecordProcessor,
90+
outputStream)
91+
} finally {
92+
dataStream.close()
93+
}
94+
}
95+
96+
private def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = {
97+
CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters)
98+
}
99+
100+
private def getReaderParameters: ReaderParameters = {
101+
val cobolParameters = CobolParametersParser.parse(new Parameters(caseInsensitiveOptions.toMap))
102+
103+
CobolParametersParser.getReaderProperties(cobolParameters, None)
104+
}
105+
106+
private def getRecordExtractor[T: ClassTag](readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
107+
val dataStream = inputStream.copyStream()
108+
val headerStream = inputStream.copyStream()
109+
110+
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
111+
112+
reader.recordExtractor(0, dataStream, headerStream).get
113+
}
114+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import za.co.absa.cobrix.cobol.parser.Copybook
20+
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
21+
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
22+
23+
import java.io.OutputStream
24+
25+
object StreamProcessor {
26+
/**
27+
* Processes a stream of COBOL raw records and writes it back in the same format as the input data.
28+
*
29+
* @param copybook the COBOL copybook that describes the schema of the records.
30+
* @param options arbitrary options used for splitting input data into records. Same as options to 'spark-cobol'. Can contain custom options as well.
31+
* @param inputStream the input stream containing the raw COBOL records.
32+
* @param recordExtractor the extractor that extracts raw records from the input stream.
33+
* @param recordProcessor the per-record processing logic implementation.
34+
* @param outputStream the output stream where the processed records will be written.
35+
*/
36+
def processStream(copybook: Copybook,
37+
options: Map[String, String],
38+
inputStream: SimpleStream,
39+
recordExtractor: RawRecordExtractor,
40+
recordProcessor: RawRecordProcessor,
41+
outputStream: OutputStream): Unit = {
42+
var i = 0
43+
while (recordExtractor.hasNext) {
44+
i += 1
45+
val record = recordExtractor.next()
46+
val recordSize = record.length
47+
48+
val updatedRecord = if (recordExtractor.hasNext) {
49+
recordProcessor.processRecord(copybook, options, record, recordExtractor.offset)
50+
} else {
51+
record
52+
}
53+
54+
val headerSize = recordExtractor.offset - recordSize - inputStream.offset
55+
if (headerSize > 0) {
56+
val header = inputStream.next(headerSize.toInt)
57+
outputStream.write(header)
58+
}
59+
inputStream.next(recordSize)
60+
outputStream.write(updatedRecord)
61+
}
62+
63+
val footerSize = inputStream.size - inputStream.offset
64+
if (footerSize > 0) {
65+
val footer = inputStream.next(footerSize.toInt)
66+
outputStream.write(footer)
67+
}
68+
}
69+
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package za.co.absa.cobrix.cobol.reader.stream
1818

19-
import java.io.{BufferedInputStream, File, FileInputStream, IOException}
19+
import java.io.{BufferedInputStream, File, FileInputStream, FileNotFoundException, IOException}
2020

2121
class FSStream (fileName: String) extends SimpleStream {
2222
val bytesStream = new BufferedInputStream(new FileInputStream(fileName))
@@ -33,7 +33,6 @@ class FSStream (fileName: String) extends SimpleStream {
3333

3434
override def inputFileName: String = fileName
3535

36-
@throws(classOf[IllegalArgumentException])
3736
@throws(classOf[IOException])
3837
override def next(numberOfBytes: Int): Array[Byte] = {
3938
if (numberOfBytes <= 0) throw new IllegalArgumentException("Value of numberOfBytes should be greater than zero.")
@@ -55,4 +54,9 @@ class FSStream (fileName: String) extends SimpleStream {
5554
isClosed = true
5655
}
5756
}
57+
58+
@throws(classOf[FileNotFoundException])
59+
override def copyStream(): SimpleStream = {
60+
new FSStream(fileName)
61+
}
5862
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ trait SimpleStream {
2929

3030
def isEndOfStream: Boolean = offset >= size
3131

32+
@throws(classOf[Exception])
33+
def copyStream(): SimpleStream
34+
3235
@throws(classOf[Exception]) def next(numberOfBytes: Int): Array[Byte]
3336

3437
@throws(classOf[Exception]) def close(): Unit

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package za.co.absa.cobrix.cobol.mock
1818

19-
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
19+
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}
20+
21+
import java.io.FileNotFoundException
2022

2123
class ByteStreamMock(bytes: Array[Byte]) extends SimpleStream{
2224

@@ -48,4 +50,8 @@ class ByteStreamMock(bytes: Array[Byte]) extends SimpleStream{
4850
}
4951

5052
override def close(): Unit = position = sz
53+
54+
override def copyStream(): SimpleStream = {
55+
new ByteStreamMock(bytes)
56+
}
5157
}

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package za.co.absa.cobrix.cobol.reader.memorystream
1818

19+
import za.co.absa.cobrix.cobol.mock.ByteStreamMock
1920
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
2021

2122
class TestByteStream(bytes: Array[Byte]) extends SimpleStream{
@@ -48,4 +49,8 @@ class TestByteStream(bytes: Array[Byte]) extends SimpleStream{
4849
}
4950

5051
override def close(): Unit = position = sz
52+
53+
override def copyStream(): SimpleStream = {
54+
new TestByteStream(bytes)
55+
}
5156
}

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,8 @@ class TestStringStream(str: String) extends SimpleStream{
4848
}
4949

5050
override def close(): Unit = position = sz
51+
52+
override def copyStream(): SimpleStream = {
53+
new TestStringStream(str)
54+
}
5155
}

0 commit comments

Comments
 (0)