Skip to content

Commit 98751ef

Browse files
committed
#795 Improve the syntax of generating raw record RDDs.
1 parent 9d9c23d commit 98751ef

File tree

3 files changed

+42
-38
lines changed

3 files changed

+42
-38
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ object CobolParametersParser extends Logging {
222222
policy
223223
}
224224

225-
def parse(params: Parameters): CobolParameters = {
225+
def parse(params: Parameters, validateRedundantOptions: Boolean = true): CobolParameters = {
226226
val schemaRetentionPolicy = getSchemaRetentionPolicy(params)
227227
val stringTrimmingPolicy = getStringTrimmingPolicy(params)
228228
val ebcdicCodePageName = params.getOrElse(PARAM_EBCDIC_CODE_PAGE, "common")
@@ -306,7 +306,7 @@ object CobolParametersParser extends Logging {
306306
MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic")),
307307
params.getMap
308308
)
309-
validateSparkCobolOptions(params, recordFormat)
309+
validateSparkCobolOptions(params, recordFormat, validateRedundantOptions)
310310
cobolParameters
311311
}
312312

@@ -753,7 +753,7 @@ object CobolParametersParser extends Logging {
753753
*
754754
* @param params Parameters provided by spark.read.option(...)
755755
*/
756-
private def validateSparkCobolOptions(params: Parameters, recordFormat: RecordFormat): Unit = {
756+
private def validateSparkCobolOptions(params: Parameters, recordFormat: RecordFormat, validateRedundantOptions: Boolean): Unit = {
757757
val isRecordSequence = params.getOrElse(PARAM_IS_XCOM, "false").toBoolean ||
758758
params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean ||
759759
params.contains(PARAM_FILE_START_OFFSET) ||
@@ -946,7 +946,7 @@ object CobolParametersParser extends Logging {
946946
params.contains(PARAM_STRICT_INTEGRAL_PRECISION) && params(PARAM_STRICT_INTEGRAL_PRECISION).toBoolean)
947947
throw new IllegalArgumentException(s"Options '$PARAM_DISPLAY_PIC_ALWAYS_STRING' and '$PARAM_STRICT_INTEGRAL_PRECISION' cannot be used together.")
948948

949-
if (unusedKeys.nonEmpty) {
949+
if (validateRedundantOptions && unusedKeys.nonEmpty) {
950950
val unusedKeyStr = unusedKeys.mkString(",")
951951
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
952952
if (isPedantic) {

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import org.apache.hadoop.fs.Path
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.SparkSession
2222
import org.slf4j.LoggerFactory
23+
import za.co.absa.cobrix.cobol.parser.Copybook
2324
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorBase
2425
import za.co.absa.cobrix.cobol.processor.{CobolProcessingStrategy, CobolProcessor, SerializableRawRecordProcessor}
2526
import za.co.absa.cobrix.cobol.reader.common.Constants
2627
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
2728
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_GENERATE_RECORD_ID
2829
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters}
30+
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2931
import za.co.absa.cobrix.spark.cobol.reader.VarLenReader
3032
import za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder
3133
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
@@ -73,10 +75,6 @@ object SparkCobolProcessor {
7375
throw new IllegalArgumentException("Copybook contents must be provided.")
7476
}
7577

76-
if (rawRecordProcessorOpt.isEmpty) {
77-
throw new IllegalArgumentException("A RawRecordProcessor must be provided.")
78-
}
79-
8078
if (numberOfThreads < 1) {
8179
throw new IllegalArgumentException("Number of threads must be at least 1.")
8280
}
@@ -85,23 +83,7 @@ object SparkCobolProcessor {
8583
throw new IllegalArgumentException("At least one input file must be provided.")
8684
}
8785

88-
new SparkCobolProcessorLoader(filePaths, copybookContentsOpt.get, rawRecordProcessorOpt.get, cobolProcessingStrategy, numberOfThreads, caseInsensitiveOptions.toMap)
89-
}
90-
91-
def toRDD(path: String): RDD[Array[Byte]] = {
92-
val filePaths = FileUtils
93-
.getFiles(path, spark.sparkContext.hadoopConfiguration)
94-
95-
toRDD(filePaths)
96-
}
97-
98-
def toRDD(filePaths: Seq[String]): RDD[Array[Byte]] = {
99-
if (copybookContentsOpt.isEmpty) {
100-
throw new IllegalArgumentException("Copybook contents must be provided.")
101-
}
102-
103-
val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
104-
getRecordRdd(filePaths, copybookContentsOpt.get, caseInsensitiveOptions.toMap, sconf)
86+
new SparkCobolProcessorLoader(filePaths, copybookContentsOpt.get, rawRecordProcessorOpt, cobolProcessingStrategy, numberOfThreads, caseInsensitiveOptions.toMap)
10587
}
10688

10789
def withCopybookContents(copybookContents: String): SparkCobolProcessorBuilder = {
@@ -154,12 +136,16 @@ object SparkCobolProcessor {
154136

155137
class SparkCobolProcessorLoader(filesToRead: Seq[String],
156138
copybookContents: String,
157-
rawRecordProcessor: SerializableRawRecordProcessor,
139+
rawRecordProcessorOpt: Option[SerializableRawRecordProcessor],
158140
cobolProcessingStrategy: CobolProcessingStrategy,
159141
numberOfThreads: Int,
160142
options: Map[String, String])
161143
(implicit spark: SparkSession) {
162144
def save(outputPath: String): Long = {
145+
if (rawRecordProcessorOpt.isEmpty) {
146+
throw new IllegalArgumentException("A RawRecordProcessor must be provided.")
147+
}
148+
163149
val cobolProcessor = CobolProcessor.builder
164150
.withCopybookContents(copybookContents)
165151
.withProcessingStrategy(cobolProcessingStrategy)
@@ -170,14 +156,25 @@ object SparkCobolProcessor {
170156
private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
171157

172158
override def process(listOfFiles: Seq[String], outputPath: String): Long = {
173-
getFileProcessorRdd(listOfFiles, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
159+
getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, rawRecordProcessorOpt.get, sconf, numberOfThreads)
174160
.reduce(_ + _)
175161
}
176162
}
177163

178164
log.info(s"Writing to $outputPath...")
179165
processor.process(filesToRead, outputPath)
180166
}
167+
168+
def getParsedCopybook: Copybook = {
169+
val cobolParameters = getCobolParameters(filesToRead, copybookContents, options, ignoreRedundantOptions = true)
170+
val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
171+
CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters).copybook
172+
}
173+
174+
def toRDD: RDD[Array[Byte]] = {
175+
val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
176+
getRecordRdd(filesToRead, copybookContents, options, sconf)
177+
}
181178
}
182179

183180
def builder(implicit spark: SparkSession): SparkCobolProcessorBuilder = {
@@ -186,7 +183,6 @@ object SparkCobolProcessor {
186183

187184
private def getFileProcessorRdd(listOfFiles: Seq[String],
188185
outputPath: String,
189-
copybookContents: String,
190186
cobolProcessor: CobolProcessor,
191187
rawRecordProcessor: SerializableRawRecordProcessor,
192188
sconf: SerializableConfiguration,
@@ -195,19 +191,22 @@ object SparkCobolProcessor {
195191
val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq
196192
val rdd = spark.sparkContext.parallelize(groupedFiles)
197193
rdd.map(group => {
198-
processListOfFiles(group, outputPath, copybookContents, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
194+
processListOfFiles(group, outputPath, cobolProcessor, rawRecordProcessor, sconf, numberOfThreads)
199195
})
200196
}
201197

198+
private def getCobolParameters(listOfFiles: Seq[String], copybookContents: String, options: Map[String, String], ignoreRedundantOptions: Boolean): CobolParameters = {
199+
val varLenOptions = options + (PARAM_GENERATE_RECORD_ID -> "true")
200+
201+
CobolParametersParser.parse(new Parameters(varLenOptions), !ignoreRedundantOptions)
202+
.copy(sourcePaths = listOfFiles, copybookContent = Option(copybookContents))
203+
}
204+
202205
private def getRecordRdd(listOfFiles: Seq[String],
203206
copybookContents: String,
204207
options: Map[String, String],
205208
sconf: SerializableConfiguration)(implicit spark: SparkSession): RDD[Array[Byte]] = {
206-
207-
val varLenOptions = options + (PARAM_GENERATE_RECORD_ID -> "true")
208-
209-
val cobolParameters: CobolParameters = CobolParametersParser.parse(new Parameters(varLenOptions))
210-
.copy(sourcePaths = listOfFiles, copybookContent = Option(copybookContents))
209+
val cobolParameters = getCobolParameters(listOfFiles, copybookContents, options, ignoreRedundantOptions = false)
211210

212211
val readerParameters = CobolParametersParser.getReaderProperties(cobolParameters, None)
213212
val cobolReader = DefaultSource.createVariableLengthReader(cobolParameters, spark)
@@ -248,7 +247,6 @@ object SparkCobolProcessor {
248247

249248
private def processListOfFiles(listOfFiles: Seq[String],
250249
outputPath: String,
251-
copybookContents: String,
252250
cobolProcessor: CobolProcessor,
253251
rawRecordProcessor: SerializableRawRecordProcessor,
254252
sconf: SerializableConfiguration,

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
4747
val exception = intercept[IllegalArgumentException] {
4848
SparkCobolProcessor.builder
4949
.withCopybookContents(copybook).load(".")
50+
.save("ignored")
5051
}
5152

5253
assert(exception.getMessage.contains("A RawRecordProcessor must be provided."))
@@ -159,10 +160,13 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
159160

160161
writeBinaryFile(inputPath, binData)
161162

162-
val rdd = SparkCobolProcessor.builder
163+
val rddBuilder = SparkCobolProcessor.builder
163164
.withCopybookContents(copybook)
164165
.option("enable_indexes", "false")
165-
.toRDD(inputPath)
166+
.load(inputPath)
167+
168+
val parsedCopybook = rddBuilder.getParsedCopybook
169+
val rdd = rddBuilder.toRDD
166170

167171
val count = rdd.count()
168172

@@ -174,6 +178,7 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
174178
.sortBy(x => x)
175179
.collect().mkString(", ")
176180

181+
assert(parsedCopybook.ast.children.length == 1)
177182
assert(actual == expected)
178183
}
179184
}
@@ -192,7 +197,8 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar
192197
.withCopybookContents(copybook)
193198
.option("enable_indexes", "true")
194199
.option("input_split_records", "2")
195-
.toRDD(inputPath)
200+
.load(inputPath)
201+
.toRDD
196202

197203
val count = rdd.count()
198204

0 commit comments

Comments
 (0)