Skip to content

Commit 5e5dd76

Browse files
committed
#805 Fix PR suggestions.
1 parent 9e6ff80 commit 5e5dd76

File tree

6 files changed

+34
-17
lines changed

6 files changed

+34
-17
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,10 @@ object CobolParametersParser extends Logging {
950950
params.contains(PARAM_STRICT_INTEGRAL_PRECISION) && params(PARAM_STRICT_INTEGRAL_PRECISION).toBoolean)
951951
throw new IllegalArgumentException(s"Options '$PARAM_DISPLAY_PIC_ALWAYS_STRING' and '$PARAM_STRICT_INTEGRAL_PRECISION' cannot be used together.")
952952

953+
if (params.contains(PARAM_ENABLE_INDEXES) && !params(PARAM_ENABLE_INDEXES).toBoolean &&
954+
params.contains(PARAM_ENABLE_INDEX_CACHE) && params(PARAM_ENABLE_INDEX_CACHE).toBoolean)
955+
throw new IllegalArgumentException(s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")
956+
953957
if (validateRedundantOptions && unusedKeys.nonEmpty) {
954958
val unusedKeyStr = unusedKeys.mkString(",")
955959
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,15 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
6565
class CobolRelation(sourceDirs: Seq[String],
6666
cobolReader: Reader,
6767
localityParams: LocalityParameters,
68-
debugIgnoreFileSize: Boolean,
69-
indexCachingAllowed: Boolean)
68+
debugIgnoreFileSize: Boolean)
7069
(@transient val sqlContext: SQLContext)
7170
extends BaseRelation
7271
with Serializable
7372
with TableScan {
7473

7574
private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval)
7675

77-
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, indexCachingAllowed)(localityParams)
76+
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, cobolReader.getReaderProperties.isIndexGenerationNeeded)(localityParams)
7877

7978
override def schema: StructType = {
8079
cobolReader.getSparkSchema

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,11 @@ class DefaultSource
5858

5959
val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
6060
CobolParametersValidator.checkSanity(cobolParameters)
61-
val indexCachingAllowed = cobolParameters.variableLengthParams match {
62-
case Some(varLenParams) => varLenParams.isIndexCachingAllowed
63-
case None => false
64-
}
6561

6662
new CobolRelation(cobolParameters.sourcePaths,
6763
buildEitherReader(sqlContext.sparkSession, cobolParameters),
6864
LocalityParameters.extract(cobolParameters),
69-
cobolParameters.debugIgnoreFileSize,
70-
indexCachingAllowed)(sqlContext)
65+
cobolParameters.debugIgnoreFileSize)(sqlContext)
7166
}
7267

7368
/** Writer relation */

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private[cobol] object IndexBuilder extends Logging {
124124
val conf = sqlContext.sparkContext.hadoopConfiguration
125125
val sconf = new SerializableConfiguration(conf)
126126

127-
// Splitting between files for which indexes are chached and teh list of files for which indexes are not cached
127+
// Splitting between files for which indexes are cached and the list of files for which indexes are not cached
128128
val cachedFiles = if (cachingAllowed) {
129129
filesList.filter(f => indexCache.containsKey(f.filePath))
130130
} else {
@@ -381,7 +381,7 @@ private[cobol] object IndexBuilder extends Logging {
381381
private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
382382
val indexCount = indexes.length
383383

384-
val numPartitions = Math.min(indexCount, Constants.maxNumPartitions)
384+
val numPartitions = Math.max(1, Math.min(indexCount, Constants.maxNumPartitions))
385385
logger.info(s"Index elements count: ${indexes.length}, number of partitions = $numPartitions")
386386

387387
sqlContext.sparkContext.parallelize(indexes, numPartitions)

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
6464
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
6565
testReader,
6666
localityParams = localityParams,
67-
debugIgnoreFileSize = false,
68-
indexCachingAllowed = false)(sqlContext)
67+
debugIgnoreFileSize = false)(sqlContext)
6968
val cobolData: RDD[Row] = relation.parseRecords(testReader, oneRowRDD)
7069

7170
val cobolDataFrame = sqlContext.createDataFrame(cobolData, sparkSchema)
@@ -89,8 +88,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
8988
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
9089
testReader,
9190
localityParams = localityParams,
92-
debugIgnoreFileSize = false,
93-
indexCachingAllowed = false)(sqlContext)
91+
debugIgnoreFileSize = false)(sqlContext)
9492

9593
val caught = intercept[Exception] {
9694
relation.parseRecords(testReader, oneRowRDD).collect()
@@ -105,8 +103,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
105103
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
106104
testReader,
107105
localityParams = localityParams,
108-
debugIgnoreFileSize = false,
109-
indexCachingAllowed = false)(sqlContext)
106+
debugIgnoreFileSize = false)(sqlContext)
110107

111108
val caught = intercept[SparkException] {
112109
relation.parseRecords(testReader, oneRowRDD).collect()

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package za.co.absa.cobrix.spark.cobol.source.integration
1818

1919
import org.apache.spark.SparkException
2020
import org.scalatest.wordspec.AnyWordSpec
21+
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.{PARAM_ENABLE_INDEXES, PARAM_ENABLE_INDEX_CACHE}
2122
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
2223
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
2324

@@ -195,6 +196,27 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with
195196
}
196197
}
197198

199+
"throw an exception when index caching is requested while indexes are turned off" in {
200+
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
201+
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""
202+
203+
val ex = intercept[IllegalArgumentException] {
204+
spark.read
205+
.format("cobol")
206+
.option("copybook_contents", copybook)
207+
.option("record_format", "F")
208+
.option("record_length_field", "SEG-ID")
209+
.option("enable_indexes", "false")
210+
.option("enable_index_cache", "true")
211+
.option("pedantic", "true")
212+
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
213+
.load(tempFile)
214+
}
215+
216+
assert(ex.getMessage == s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")
217+
}
218+
}
219+
198220
"throw an exception for unknown mapping" in {
199221
withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile =>
200222
val df = spark.read

0 commit comments

Comments
 (0)