Skip to content

Commit c530c68

Browse files
committed
#788 Change the SparkCobolProcessor logic to be based on the builder pattern.
1 parent f2c8397 commit c530c68

File tree

6 files changed

+88
-36
lines changed

6 files changed

+88
-36
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
import za.co.absa.cobrix.cobol.parser.Copybook
20+
21+
case class CobolProcessorContext(copybook: Copybook,
22+
options: Map[String, String],
23+
currentOffset: Long)

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/RawRecordProcessor.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,10 @@
1616

1717
package za.co.absa.cobrix.cobol.processor
1818

19-
import za.co.absa.cobrix.cobol.parser.Copybook
20-
2119
/**
2220
* A trait that defines a processor for raw COBOL records.
2321
* It provides a method to process a single COBOL record based on the provided copybook and options.
2422
*/
2523
trait RawRecordProcessor {
26-
def processRecord(copybook: Copybook,
27-
options: Map[String, String],
28-
record: Array[Byte],
29-
offset: Long): Array[Byte]
30-
24+
def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte]
3125
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/StreamProcessor.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package za.co.absa.cobrix.cobol.processor.impl
1818

1919
import za.co.absa.cobrix.cobol.parser.Copybook
20-
import za.co.absa.cobrix.cobol.processor.RawRecordProcessor
20+
import za.co.absa.cobrix.cobol.processor.{CobolProcessorContext, RawRecordProcessor}
2121
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
2222
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
2323

@@ -48,7 +48,9 @@ object StreamProcessor {
4848
val record = recordExtractor.next()
4949
val recordSize = record.length
5050

51-
val updatedRecord = recordProcessor.processRecord(copybook, options, record, recordExtractor.offset)
51+
val ctx = CobolProcessorContext(copybook, options, recordExtractor.offset)
52+
53+
val updatedRecord = recordProcessor.processRecord(record, ctx)
5254

5355
val headerSize = recordExtractor.offset - recordSize - inputStream.offset
5456
if (headerSize > 0) {

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class CobolProcessorBuilderSuite extends AnyWordSpec {
3838
val builder = CobolProcessor.builder(copybook)
3939

4040
val processor = new RawRecordProcessor {
41-
override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
41+
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
4242
record.map(v => (v - 1).toByte)
4343
}
4444
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory
2323
import za.co.absa.cobrix.cobol.processor.{CobolProcessor, SerializableRawRecordProcessor}
2424
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
2525
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
26+
import za.co.absa.cobrix.spark.cobol.utils.FileUtils
2627

2728
import java.io.BufferedOutputStream
2829
import java.util.concurrent.{ExecutorService, Executors}
@@ -50,7 +51,14 @@ object SparkCobolProcessor {
5051
private var rawRecordProcessorOpt: Option[SerializableRawRecordProcessor] = None
5152
private var numberOfThreads: Int = 1
5253

53-
def build(): SparkCobolProcessor = {
54+
def load(path: String): SparkCobolProcessorLoader = {
55+
val filePaths = FileUtils
56+
.getFiles(path, spark.sparkContext.hadoopConfiguration)
57+
58+
load(filePaths)
59+
}
60+
61+
def load(filePaths: Seq[String]): SparkCobolProcessorLoader = {
5462
if (copybookContentsOpt.isEmpty) {
5563
throw new IllegalArgumentException("Copybook contents must be provided.")
5664
}
@@ -63,18 +71,11 @@ object SparkCobolProcessor {
6371
throw new IllegalArgumentException("Number of threads must be at least 1.")
6472
}
6573

66-
val cobolProcessor = CobolProcessor.builder(copybookContentsOpt.get)
67-
.options(caseInsensitiveOptions.toMap)
68-
.build()
69-
70-
new SparkCobolProcessor {
71-
private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
72-
73-
override def process(listOfFiles: Seq[String], outputPath: String): Long = {
74-
getFileProcessorRdd(listOfFiles, outputPath, copybookContentsOpt.get, cobolProcessor, rawRecordProcessorOpt.get, sconf, numberOfThreads)
75-
.reduce(_ + _)
76-
}
74+
if (filePaths.isEmpty) {
75+
throw new IllegalArgumentException("At least one input file must be provided.")
7776
}
77+
78+
new SparkCobolProcessorLoader(filePaths, copybookContentsOpt.get, rawRecordProcessorOpt.get, numberOfThreads, caseInsensitiveOptions.toMap)
7879
}
7980

8081
def withCopybookContents(copybookContents: String): SparkCobolProcessorBuilder = {
@@ -118,9 +119,31 @@ object SparkCobolProcessor {
118119
caseInsensitiveOptions ++= options.map(kv => (kv._1.toLowerCase(), kv._2))
119120
this
120121
}
121-
122122
}
123123

124+
class SparkCobolProcessorLoader(filesToRead: Seq[String],
125+
copybookContents: String,
126+
rawRecordProcessor: SerializableRawRecordProcessor,
127+
numberOfThreads: Int,
128+
options: Map[String, String])
129+
(implicit spark: SparkSession) {
130+
def save(outputPath: String): Long = {
131+
val cobolProcessor = CobolProcessor.builder(copybookContents)
132+
.options(options)
133+
.build()
134+
135+
val processor = new SparkCobolProcessor {
136+
private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
137+
138+
override def process(listOfFiles: Seq[String], outputPath: String): Long = {
139+
getFileProcessorRdd(listOfFiles, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
140+
.reduce(_ + _)
141+
}
142+
}
143+
144+
processor.process(filesToRead, outputPath)
145+
}
146+
}
124147

125148
def builder(implicit spark: SparkSession): SparkCobolProcessorBuilder = {
126149
new SparkCobolProcessorBuilder

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ package za.co.absa.cobrix.spark.cobol
1818

1919
import org.apache.hadoop.fs.Path
2020
import org.scalatest.wordspec.AnyWordSpec
21-
import za.co.absa.cobrix.cobol.parser.Copybook
22-
import za.co.absa.cobrix.cobol.processor.SerializableRawRecordProcessor
21+
import za.co.absa.cobrix.cobol.processor.{CobolProcessorContext, SerializableRawRecordProcessor}
2322
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
2423
import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture}
2524

@@ -30,15 +29,15 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
3029
|""".stripMargin
3130

3231
private val rawRecordProcessor = new SerializableRawRecordProcessor {
33-
override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
32+
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
3433
record.map(v => (v - 1).toByte)
3534
}
3635
}
3736

3837
"SparkCobolProcessor" should {
3938
"fail to create when a copybook is not specified" in {
4039
val exception = intercept[IllegalArgumentException] {
41-
SparkCobolProcessor.builder.build()
40+
SparkCobolProcessor.builder.load(".")
4241
}
4342

4443
assert(exception.getMessage.contains("Copybook contents must be provided."))
@@ -47,8 +46,7 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
4746
"fail to create when a record processor is not provided" in {
4847
val exception = intercept[IllegalArgumentException] {
4948
SparkCobolProcessor.builder
50-
.withCopybookContents(copybook)
51-
.build()
49+
.withCopybookContents(copybook).load(".")
5250
}
5351

5452
assert(exception.getMessage.contains("A RawRecordProcessor must be provided."))
@@ -60,13 +58,24 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
6058
.withCopybookContents(copybook)
6159
.withRecordProcessor(rawRecordProcessor)
6260
.withMultithreaded(0)
63-
.build()
61+
.load("")
6462
}
6563

6664
assert(exception.getMessage.contains("Number of threads must be at least 1."))
6765
}
6866

69-
"create a processor that processes files via an RDD" in {
67+
"fail when no files are provided" in {
68+
val exception = intercept[IllegalArgumentException] {
69+
SparkCobolProcessor.builder
70+
.withCopybookContents(copybook)
71+
.withRecordProcessor(rawRecordProcessor)
72+
.load(Seq.empty)
73+
}
74+
75+
assert(exception.getMessage.contains("At least one input file must be provided."))
76+
}
77+
78+
"process files via an RDD" in {
7079
withTempDirectory("spark_cobol_processor") { tempDir =>
7180
val binData = Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)
7281

@@ -76,12 +85,13 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
7685

7786
writeBinaryFile(inputPath, binData)
7887

79-
val processor = SparkCobolProcessor.builder
88+
SparkCobolProcessor.builder
8089
.withCopybookContents(copybook)
81-
.withRecordProcessor(rawRecordProcessor)
82-
.build()
83-
84-
processor.process(Seq(inputPath), outputPath)
90+
.withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) =>
91+
record.map(v => (v - 1).toByte)
92+
}
93+
.load(inputPath)
94+
.save(outputPath)
8595

8696
val outputData = readBinaryFile(outputFile)
8797

0 commit comments

Comments
 (0)