Skip to content

Commit de556c2

Browse files
committed
#805 Implement index caching for VRL files for faster processing when same files need to be processed multiple times.
1 parent 5b83b1a commit de556c2

File tree

10 files changed

+94
-27
lines changed

10 files changed

+94
-27
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ object CobolParametersParser extends Logging {
119119

120120
// Indexed multisegment file processing
121121
val PARAM_ENABLE_INDEXES = "enable_indexes"
122+
val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache"
122123
val PARAM_INPUT_SPLIT_RECORDS = "input_split_records"
123124
val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb"
124125
val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix"
@@ -381,6 +382,7 @@ object CobolParametersParser extends Logging {
381382
fileEndOffset = 0,
382383
generateRecordId = false,
383384
isUsingIndex = false,
385+
isIndexCachingAllowed = false,
384386
inputSplitRecords = None,
385387
inputSplitSizeMB = None,
386388
improveLocality = false,
@@ -416,6 +418,7 @@ object CobolParametersParser extends Logging {
416418
isRdwPartRecLength = varLenParams.isRdwPartRecLength,
417419
rdwAdjustment = varLenParams.rdwAdjustment,
418420
isIndexGenerationNeeded = varLenParams.isUsingIndex,
421+
isIndexCachingAllowed = varLenParams.isIndexCachingAllowed,
419422
inputSplitRecords = varLenParams.inputSplitRecords,
420423
inputSplitSizeMB = varLenParams.inputSplitSizeMB,
421424
hdfsDefaultBlockSize = defaultBlockSize,
@@ -502,6 +505,7 @@ object CobolParametersParser extends Logging {
502505
fileEndOffset,
503506
isRecordIdGenerationEnabled,
504507
params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean,
508+
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "true").toBoolean,
505509
params.get(PARAM_INPUT_SPLIT_RECORDS).map(v => v.toInt),
506510
params.get(PARAM_INPUT_SPLIT_SIZE_MB).map(v => v.toInt),
507511
params.getOrElse(PARAM_IMPROVE_LOCALITY, "true").toBoolean,

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ case class ReaderParameters(
9999
isRdwPartRecLength: Boolean = false,
100100
rdwAdjustment: Int = 0,
101101
isIndexGenerationNeeded: Boolean = false,
102+
isIndexCachingAllowed: Boolean = false,
102103
inputSplitRecords: Option[Int] = None,
103104
inputSplitSizeMB: Option[Int] = None,
104105
hdfsDefaultBlockSize: Option[Int] = None,

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ package za.co.absa.cobrix.cobol.reader.parameters
3434
* @param fileEndOffset A number of bytes to skip at the end of each file
3535
* @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data
3636
* @param isUsingIndex Is indexing input file before processing is requested
37+
* @param isIndexCachingAllowed Is caching of generated index allowed
3738
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
3839
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
3940
* @param improveLocality Tries to improve locality by extracting preferred locations for variable-length records
@@ -56,6 +57,7 @@ case class VariableLengthParameters(
5657
fileEndOffset: Int,
5758
generateRecordId: Boolean,
5859
isUsingIndex: Boolean,
60+
isIndexCachingAllowed: Boolean,
5961
inputSplitRecords: Option[Int],
6062
inputSplitSizeMB: Option[Int],
6163
improveLocality: Boolean,

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ object SparkCobolProcessor {
218218
case reader: VarLenReader if reader.isIndexGenerationNeeded && allowIndexes =>
219219
val orderedFiles = CobolRelation.getListFilesWithOrder(listOfFiles, spark.sqlContext, isRecursiveRetrieval = false)
220220
val filesMap = orderedFiles.map(fileWithOrder => (fileWithOrder.order, fileWithOrder.filePath)).toMap
221-
val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext)(LocalityParameters(improveLocality = false, optimizeAllocation = false))
221+
val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(orderedFiles, cobolReader, spark.sqlContext, readerParameters.isIndexCachingAllowed)(LocalityParameters(improveLocality = false, optimizeAllocation = false))
222222

223223
indexes.flatMap(indexEntry => {
224224
val filePathName = filesMap(indexEntry.fileId)

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,16 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
6565
class CobolRelation(sourceDirs: Seq[String],
6666
cobolReader: Reader,
6767
localityParams: LocalityParameters,
68-
debugIgnoreFileSize: Boolean
69-
)(@transient val sqlContext: SQLContext)
68+
debugIgnoreFileSize: Boolean,
69+
indexCachingAllowed: Boolean)
70+
(@transient val sqlContext: SQLContext)
7071
extends BaseRelation
7172
with Serializable
7273
with TableScan {
7374

7475
private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval)
7576

76-
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext)(localityParams)
77+
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, indexCachingAllowed)(localityParams)
7778

7879
override def schema: StructType = {
7980
cobolReader.getSparkSchema

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,16 @@ class DefaultSource
5858

5959
val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
6060
CobolParametersValidator.checkSanity(cobolParameters)
61+
val indexCachingAllowed = cobolParameters.variableLengthParams match {
62+
case Some(varLenParams) => varLenParams.isIndexCachingAllowed
63+
case None => false
64+
}
6165

6266
new CobolRelation(cobolParameters.sourcePaths,
6367
buildEitherReader(sqlContext.sparkSession, cobolParameters),
6468
LocalityParameters.extract(cobolParameters),
65-
cobolParameters.debugIgnoreFileSize)(sqlContext)
69+
cobolParameters.debugIgnoreFileSize,
70+
indexCachingAllowed)(sqlContext)
6671
}
6772

6873
/** Writer relation */

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
3434
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
3535
import za.co.absa.cobrix.spark.cobol.utils.{HDFSUtils, SparkUtils}
3636

37+
import java.util.concurrent.ConcurrentHashMap
3738
import scala.collection.mutable.ArrayBuffer
3839

3940
/**
@@ -45,18 +46,24 @@ import scala.collection.mutable.ArrayBuffer
4546
* In a nutshell, ideally, there will be as many partitions as are there are indexes.
4647
*/
4748
private[cobol] object IndexBuilder extends Logging {
49+
private val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
50+
4851
def buildIndex(filesList: Array[FileWithOrder],
4952
cobolReader: Reader,
50-
sqlContext: SQLContext)
53+
sqlContext: SQLContext,
54+
cachingAllowed: Boolean)
5155
(localityParams: LocalityParameters): RDD[SparseIndexEntry] = {
5256
val fs = new Path(filesList.head.filePath).getFileSystem(sqlContext.sparkSession.sparkContext.hadoopConfiguration)
5357

5458
cobolReader match {
5559
case reader: VarLenReader if reader.isIndexGenerationNeeded && localityParams.improveLocality && isDataLocalitySupported(fs) =>
60+
logger.info("Building indexes with data locality...")
5661
buildIndexForVarLenReaderWithFullLocality(filesList, reader, sqlContext, localityParams.optimizeAllocation)
5762
case reader: VarLenReader =>
58-
buildIndexForVarLenReader(filesList, reader, sqlContext)
63+
logger.info("Building indexes for variable record length input files...")
64+
buildIndexForVarLenReader(filesList, reader, sqlContext, cachingAllowed)
5965
case _ =>
66+
logger.info("Generating indexes for full files...")
6067
buildIndexForFullFiles(filesList, sqlContext)
6168
}
6269
}
@@ -112,24 +119,58 @@ private[cobol] object IndexBuilder extends Logging {
112119
*/
113120
private[cobol] def buildIndexForVarLenReader(filesList: Array[FileWithOrder],
114121
reader: VarLenReader,
115-
sqlContext: SQLContext): RDD[SparseIndexEntry] = {
122+
sqlContext: SQLContext,
123+
cachingAllowed: Boolean): RDD[SparseIndexEntry] = {
116124
val conf = sqlContext.sparkContext.hadoopConfiguration
117125
val sconf = new SerializableConfiguration(conf)
118126

119-
if (reader.getReaderProperties.enableSelfChecks && filesList.nonEmpty) {
120-
selfCheckForIndexCompatibility(reader, filesList.head.filePath, conf)
127+
// Splitting between files for which indexes are chached and teh list of files for which indexes are not cached
128+
val cachedFiles = if (cachingAllowed) {
129+
filesList.filter(f => indexCache.containsKey(f.filePath))
130+
} else {
131+
Array.empty[FileWithOrder]
121132
}
122133

123-
val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
134+
val nonCachedFiles = filesList.diff(cachedFiles)
124135

125-
val indexRDD = filesRDD.mapPartitions(
126-
partition => {
127-
partition.flatMap(row => {
128-
generateIndexEntry(row, sconf.value, reader)
129-
})
130-
}).cache()
136+
// Getting indexes for files for which indexes are not in the cache
137+
val newIndexes = if (nonCachedFiles.length > 0) {
138+
if (reader.getReaderProperties.enableSelfChecks) {
139+
selfCheckForIndexCompatibility(reader, nonCachedFiles.head.filePath, conf)
140+
}
131141

132-
repartitionIndexes(indexRDD)
142+
val filesRDD = sqlContext.sparkContext.parallelize(nonCachedFiles, nonCachedFiles.length)
143+
filesRDD.mapPartitions(
144+
partition => {
145+
partition.flatMap(row => {
146+
generateIndexEntry(row, sconf.value, reader)
147+
})
148+
}).collect()
149+
} else {
150+
Array.empty[SparseIndexEntry]
151+
}
152+
153+
// Storing new indexes in the cache
154+
if (cachingAllowed && newIndexes.length > 0) {
155+
newIndexes.groupBy(_.fileId).foreach { case (fileId, indexEntries) =>
156+
val filePathOpt = filesList.find(_.order == fileId).map(_.filePath)
157+
158+
filePathOpt.foreach { filePath =>
159+
logger.info(s"Index stored to cache for file: $filePath.")
160+
indexCache.put(filePath, indexEntries.sortBy(_.offsetFrom))
161+
}
162+
}
163+
}
164+
165+
// Getting indexes for files for which indexes are in the cache
166+
val cachedIndexes = cachedFiles.flatMap { f =>
167+
logger.info("Index fetched from cache for file: " + f.filePath)
168+
indexCache.get(f.filePath)
169+
.map(ind => ind.copy(fileId = f.order))
170+
}
171+
172+
// Creating the final RDD with all indexes
173+
createIndexRDD(cachedIndexes ++ newIndexes, sqlContext)
133174
}
134175

135176
/**
@@ -336,4 +377,13 @@ private[cobol] object IndexBuilder extends Logging {
336377
logger.info(s"Index elements count: $indexCount, number of partitions = $numPartitions")
337378
indexRDD.repartition(numPartitions).cache()
338379
}
380+
381+
private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
382+
val indexCount = indexes.length
383+
384+
val numPartitions = Math.min(indexCount, Constants.maxNumPartitions)
385+
logger.info(s"Index elements count: ${indexes.length}, number of partitions = $numPartitions")
386+
387+
sqlContext.sparkContext.parallelize(indexes, numPartitions)
388+
}
339389
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
8383
if (numberOfBytes <= 0) {
8484
new Array[Byte](0)
8585
} else if (actualBytesToRead <=0 || bufferedStream == null || bufferedStream.isClosed) {
86+
logger.info(s"End of stream reached: Requested $numberOfBytes bytes, reached offset $byteIndex.")
8687
close()
8788
new Array[Byte](0)
8889
} else {
@@ -97,7 +98,7 @@ class FileStreamer(filePath: String, fileSystem: FileSystem, startOffset: Long =
9798
if (readBytes == numberOfBytes) {
9899
buffer
99100
} else {
100-
logger.warn(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.")
101+
logger.info(s"End of stream reached: Requested $numberOfBytes bytes, received $readBytes.")
101102
close()
102103
if (readBytes == actualBytesToRead) {
103104
buffer

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
6464
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
6565
testReader,
6666
localityParams = localityParams,
67-
debugIgnoreFileSize = false)(sqlContext)
67+
debugIgnoreFileSize = false,
68+
indexCachingAllowed = false)(sqlContext)
6869
val cobolData: RDD[Row] = relation.parseRecords(testReader, oneRowRDD)
6970

7071
val cobolDataFrame = sqlContext.createDataFrame(cobolData, sparkSchema)
@@ -88,7 +89,8 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
8889
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
8990
testReader,
9091
localityParams = localityParams,
91-
debugIgnoreFileSize = false)(sqlContext)
92+
debugIgnoreFileSize = false,
93+
indexCachingAllowed = false)(sqlContext)
9294

9395
val caught = intercept[Exception] {
9496
relation.parseRecords(testReader, oneRowRDD).collect()
@@ -103,7 +105,8 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
103105
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
104106
testReader,
105107
localityParams = localityParams,
106-
debugIgnoreFileSize = false)(sqlContext)
108+
debugIgnoreFileSize = false,
109+
indexCachingAllowed = false)(sqlContext)
107110

108111
val caught = intercept[SparkException] {
109112
relation.parseRecords(testReader, oneRowRDD).collect()

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilderSpec.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest
6464

6565
val localityParameters = LocalityParameters(improveLocality = true, optimizeAllocation = true)
6666

67-
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
67+
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()
6868

6969
assert(index.length == 3)
7070
}
@@ -86,7 +86,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest
8686

8787
val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false)
8888

89-
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
89+
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()
9090

9191
assert(index.length == 3)
9292
}
@@ -104,7 +104,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest
104104

105105
val localityParameters = LocalityParameters(improveLocality = false, optimizeAllocation = false)
106106

107-
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext)(localityParameters).collect()
107+
val index = IndexBuilder.buildIndex(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false)(localityParameters).collect()
108108

109109
assert(index.length == 1)
110110
}
@@ -168,7 +168,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest
168168

169169
val reader = new VarLenNestedReader(Seq(copybook), readerParameters)
170170

171-
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect()
171+
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect()
172172

173173
assert(index.length == 3)
174174
}
@@ -188,7 +188,7 @@ class IndexBuilderSpec extends AnyWordSpec with BinaryFileFixture with SparkTest
188188

189189
val reader = new VarLenNestedReader(Seq(copybook), readerParameters)
190190

191-
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext).collect()
191+
val index = IndexBuilder.buildIndexForVarLenReader(filesWithOrder, reader, spark.sqlContext, cachingAllowed = false).collect()
192192

193193
assert(index.length == 2)
194194
}

0 commit comments

Comments
 (0)