Skip to content

Commit 89520b8

Browse files
committed
#795 Add a feature to get RDD of raw records for raw EBCDIC processing.
1 parent daef609 commit 89520b8

File tree

6 files changed

+74
-18
lines changed

6 files changed

+74
-18
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBase.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
2727
*
2828
* The processing can be done from inside an RDD so this is why it is serializable.
2929
*/
30-
abstract class CobolProcessorBase extends CobolProcessor with Serializable {
31-
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, copybookContents: String, inputStream: SimpleStream): RawRecordExtractor = {
30+
abstract class CobolProcessorBase extends CobolProcessor with Serializable
31+
32+
object CobolProcessorBase {
33+
def getRecordExtractor(readerParameters: ReaderParameters, copybookContents: String, inputStream: SimpleStream): RawRecordExtractor = {
3234
val dataStream = inputStream.copyStream()
3335
val headerStream = inputStream.copyStream()
3436

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class CobolProcessorInPlace(readerParameters: ReaderParameters,
4444
override def process(inputStream: SimpleStream,
4545
outputStream: OutputStream)
4646
(rawRecordProcessor: RawRecordProcessor): Long = {
47-
val recordExtractor = getRecordExtractor(readerParameters, copybookContents, inputStream)
47+
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream)
4848

4949
val dataStream = inputStream.copyStream()
5050
try {

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorToRdw.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class CobolProcessorToRdw(readerParameters: ReaderParameters,
4242
override def process(inputStream: SimpleStream,
4343
outputStream: OutputStream)
4444
(rawRecordProcessor: RawRecordProcessor): Long = {
45-
val recordExtractor = getRecordExtractor(readerParameters, copybookContents, inputStream)
45+
val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream)
4646

4747
StreamProcessor.processStreamToRdw(copybook,
4848
options,

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorBaseSuite.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,8 @@ class CobolProcessorBaseSuite extends AnyWordSpec {
3232
"getRecordExtractor" should {
3333
"work for an fixed-record-length files" in {
3434
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
35-
val processor = CobolProcessor.builder
36-
.withCopybookContents(copybook)
37-
.build().asInstanceOf[CobolProcessorInPlace]
3835

39-
val ext = processor.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), copybook, stream)
36+
val ext = CobolProcessorBase.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), copybook, stream)
4037

4138
assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor])
4239

@@ -48,11 +45,8 @@ class CobolProcessorBaseSuite extends AnyWordSpec {
4845

4946
"work for an variable-record-length files" in {
5047
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
51-
val processor = CobolProcessor.builder
52-
.withCopybookContents(copybook)
53-
.build().asInstanceOf[CobolProcessorInPlace]
5448

55-
val ext = processor.getRecordExtractor(ReaderParameters(
49+
val ext = CobolProcessorBase.getRecordExtractor(ReaderParameters(
5650
recordFormat = RecordFormat.VariableLength,
5751
isText = true
5852
), copybook, stream)
@@ -62,12 +56,9 @@ class CobolProcessorBaseSuite extends AnyWordSpec {
6256

6357
"throw an exception on a non-supported record format for processing" in {
6458
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
65-
val processor = CobolProcessor.builder
66-
.withCopybookContents(copybook)
67-
.build().asInstanceOf[CobolProcessorInPlace]
6859

6960
val ex = intercept[IllegalArgumentException] {
70-
processor.getRecordExtractor(ReaderParameters(
61+
CobolProcessorBase.getRecordExtractor(ReaderParameters(
7162
recordFormat = RecordFormat.VariableLength,
7263
isRecordSequence = true
7364
), copybook, stream)

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import org.apache.hadoop.fs.Path
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.SparkSession
2222
import org.slf4j.LoggerFactory
23+
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorBase
2324
import za.co.absa.cobrix.cobol.processor.{CobolProcessingStrategy, CobolProcessor, SerializableRawRecordProcessor}
25+
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters}
2426
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
2527
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
2628
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
@@ -79,6 +81,22 @@ object SparkCobolProcessor {
7981
new SparkCobolProcessorLoader(filePaths, copybookContentsOpt.get, rawRecordProcessorOpt.get, cobolProcessingStrategy, numberOfThreads, caseInsensitiveOptions.toMap)
8082
}
8183

84+
def toRDD(path: String): RDD[Array[Byte]] = {
85+
val filePaths = FileUtils
86+
.getFiles(path, spark.sparkContext.hadoopConfiguration)
87+
88+
toRDD(filePaths)
89+
}
90+
91+
def toRDD(filePaths: Seq[String]): RDD[Array[Byte]] = {
92+
if (copybookContentsOpt.isEmpty) {
93+
throw new IllegalArgumentException("Copybook contents must be provided.")
94+
}
95+
96+
val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
97+
getRecordRdd(filePaths, copybookContentsOpt.get, caseInsensitiveOptions.toMap, sconf)
98+
}
99+
82100
def withCopybookContents(copybookContents: String): SparkCobolProcessorBuilder = {
83101
copybookContentsOpt = Option(copybookContents)
84102
this
@@ -174,6 +192,23 @@ object SparkCobolProcessor {
174192
})
175193
}
176194

195+
private def getRecordRdd(listOfFiles: Seq[String],
196+
copybookContents: String,
197+
options: Map[String, String],
198+
sconf: SerializableConfiguration)(implicit spark: SparkSession): RDD[Array[Byte]] = {
199+
200+
val cobolParameters = CobolParametersParser.parse(new Parameters(options))
201+
val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
202+
203+
spark.sparkContext.parallelize(listOfFiles).flatMap { inputFile =>
204+
val hadoopConfig = sconf.value
205+
val inputFs = new Path(inputFile).getFileSystem(hadoopConfig)
206+
val ifs = new FileStreamer(inputFile, inputFs)
207+
208+
CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, ifs)
209+
}
210+
}
211+
177212
private def processListOfFiles(listOfFiles: Seq[String],
178213
outputPath: String,
179214
copybookContents: String,

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
8787

8888
SparkCobolProcessor.builder
8989
.withCopybookContents(copybook)
90-
.withRecordProcessor (new SerializableRawRecordProcessor {
90+
.withRecordProcessor(new SerializableRawRecordProcessor {
9191
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
9292
record.map(v => (v - 1).toByte)
9393
}
@@ -119,7 +119,7 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
119119
SparkCobolProcessor.builder
120120
.withCopybookContents(copybook)
121121
.withProcessingStrategy(CobolProcessingStrategy.ToVariableLength)
122-
.withRecordProcessor (new SerializableRawRecordProcessor {
122+
.withRecordProcessor(new SerializableRawRecordProcessor {
123123
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
124124
record.map(v => (v - 1).toByte)
125125
}
@@ -148,4 +148,32 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
148148
}
149149
}
150150
}
151+
152+
"convert input format into an RDD" in {
153+
val expected = """-13, -14, -15"""
154+
withTempDirectory("spark_cobol_processor") { tempDir =>
155+
val binData = Array(0xF1, 0xF2, 0xF3, 0xF1).map(_.toByte)
156+
157+
val inputPath = new Path(tempDir, "input.dat").toString
158+
val outputPath = new Path(tempDir, "output").toString
159+
160+
writeBinaryFile(inputPath, binData)
161+
162+
val rdd = SparkCobolProcessor.builder
163+
.withCopybookContents(copybook)
164+
.toRDD(inputPath)
165+
166+
val count = rdd.count()
167+
168+
assert(count == 4)
169+
170+
val actual = rdd
171+
.map(row => row.mkString)
172+
.distinct
173+
.sortBy(x => x)
174+
.collect().mkString(", ")
175+
176+
assert(actual == expected)
177+
}
178+
}
151179
}

0 commit comments

Comments
 (0)