Skip to content

Commit 02705a9

Browse files
committed
#811 Add read properties hash code as index key to avoid false cache retrieval when read options are different.
1 parent a0136c0 commit 02705a9

File tree

6 files changed

+177
-9
lines changed

6 files changed

+177
-9
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,10 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
221221
if (isCompressed) {
222222
readerProperties.inputSplitSizeCompressedMB.orElse(Some(DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB))
223223
} else {
224-
readerProperties.inputSplitSizeMB.orElse(readerProperties.fsDefaultBlockSize).map(_ * DEFAULT_FS_INDEX_SIZE_MULTIPLIER)
224+
val defaultIndexSizeBofFsBlock = readerProperties.fsDefaultBlockSize.map { size =>
225+
Math.min(size * DEFAULT_FS_INDEX_SIZE_MULTIPLIER, 256)
226+
}
227+
readerProperties.inputSplitSizeMB.orElse(defaultIndexSizeBofFsBlock)
225228
}
226229
}
227230

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
3232
import za.co.absa.cobrix.spark.cobol.source.parameters.LocalityParameters
3333
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
3434
import za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder
35-
import za.co.absa.cobrix.spark.cobol.utils.{FileUtils, HDFSUtils, SparkUtils}
35+
import za.co.absa.cobrix.spark.cobol.utils.{FileUtils, HDFSUtils, LRUCache, SparkUtils}
3636

37-
import java.util.concurrent.ConcurrentHashMap
3837
import scala.collection.mutable.ArrayBuffer
3938

4039
/**
@@ -46,7 +45,9 @@ import scala.collection.mutable.ArrayBuffer
4645
* In a nutshell, ideally, there will be as many partitions as are there are indexes.
4746
*/
4847
private[cobol] object IndexBuilder extends Logging {
49-
private[cobol] val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
48+
private val maxCacheSize = 10000
49+
50+
private[cobol] val indexCache = new LRUCache[IndexKey, Array[SparseIndexEntry]](maxCacheSize)
5051

5152
def buildIndex(filesList: Array[FileWithOrder],
5253
cobolReader: Reader,
@@ -123,10 +124,11 @@ private[cobol] object IndexBuilder extends Logging {
123124
cachingAllowed: Boolean): RDD[SparseIndexEntry] = {
124125
val conf = sqlContext.sparkContext.hadoopConfiguration
125126
val sconf = new SerializableConfiguration(conf)
127+
val readerOptionsHashCode = reader.getReaderProperties.hashCode()
126128

127129
// Splitting between files for which indexes are cached and the list of files for which indexes are not cached
128130
val cachedFiles = if (cachingAllowed) {
129-
filesList.filter(f => indexCache.containsKey(f.filePath))
131+
filesList.filter(f => indexCache.containsKey(IndexKey(f.filePath, readerOptionsHashCode)))
130132
} else {
131133
Array.empty[FileWithOrder]
132134
}
@@ -157,15 +159,15 @@ private[cobol] object IndexBuilder extends Logging {
157159

158160
filePathOpt.foreach { filePath =>
159161
logger.info(s"Index stored to cache for file: $filePath.")
160-
indexCache.put(filePath, indexEntries.sortBy(_.offsetFrom))
162+
indexCache.put(IndexKey(filePath, readerOptionsHashCode), indexEntries.sortBy(_.offsetFrom))
161163
}
162164
}
163165
}
164166

165167
// Getting indexes for files for which indexes are in the cache
166168
val cachedIndexes = cachedFiles.flatMap { f =>
167169
logger.info("Index fetched from cache for file: " + f.filePath)
168-
indexCache.get(f.filePath)
170+
indexCache(IndexKey(f.filePath, readerOptionsHashCode))
169171
.map(ind => ind.copy(fileId = f.order))
170172
}
171173

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.source.index
18+
19+
case class IndexKey(fileName: String, optionsHashCode: Long)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.utils
18+
19+
import scala.collection.JavaConverters._
20+
21+
class LRUCache[K,V](maxSize: Int, loadFactor: Float = 0.75f) {
22+
private val cache = new java.util.LinkedHashMap[K, V](16, loadFactor, true) {
23+
override def removeEldestEntry(eldest: java.util.Map.Entry[K, V]): Boolean = size() > maxSize
24+
}
25+
26+
def apply(key: K): V = synchronized {
27+
cache.get(key)
28+
}
29+
30+
def containsKey(key: K): Boolean = synchronized {
31+
cache.containsKey(key)
32+
}
33+
34+
def get(key: K): Option[V] = synchronized {
35+
Option(cache.get(key))
36+
}
37+
38+
def getMap: Map[K, V] = synchronized {
39+
cache.asScala.toMap
40+
}
41+
42+
def put(key: K, value: V): Unit = synchronized {
43+
cache.put(key, value)
44+
}
45+
46+
def remove(key: K): Unit = synchronized {
47+
cache.remove(key)
48+
}
49+
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,12 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with
197197

198198
val pathNameAsCached = s"file:$tempFile"
199199

200-
assert(IndexBuilder.indexCache.get(pathNameAsCached) != null)
201-
assert(IndexBuilder.indexCache.get(pathNameAsCached).length == 2)
200+
val indexCacheSimplified = IndexBuilder.indexCache.getMap.map {
201+
case (k, v) => (k.fileName, v)
202+
}
203+
204+
assert(indexCacheSimplified.get(pathNameAsCached) != null)
205+
assert(indexCacheSimplified(pathNameAsCached).length == 2)
202206

203207
assert(actualInitial == expected)
204208
assert(actualCached == expected)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.source.utils
18+
19+
import org.scalatest.wordspec.AnyWordSpec
20+
import za.co.absa.cobrix.spark.cobol.utils.LRUCache
21+
22+
class LRUCacheSuite extends AnyWordSpec {
23+
"LRUCache" should {
24+
"remember items put there" in {
25+
val cache = new LRUCache[Int, String](3)
26+
cache.put(1, "one")
27+
cache.put(2, "two")
28+
cache.put(3, "three")
29+
assert(cache.get(1).contains("one"))
30+
assert(cache.get(2).contains("two"))
31+
assert(cache.get(3).contains("three"))
32+
}
33+
34+
"forget old items" in {
35+
val cache = new LRUCache[Int, String](3)
36+
cache.put(1, "one")
37+
cache.put(2, "two")
38+
cache.put(3, "three")
39+
cache.put(4, "four")
40+
assert(cache.get(1).isEmpty)
41+
assert(cache(1) == null)
42+
assert(cache.get(2).contains("two"))
43+
assert(cache(2) == "two")
44+
assert(cache.get(3).contains("three"))
45+
assert(cache.get(4).contains("four"))
46+
}
47+
48+
"remember frequently used items" in {
49+
val cache = new LRUCache[Int, String](3)
50+
cache.put(1, "one")
51+
cache.put(2, "two")
52+
cache.put(3, "three")
53+
cache.get(1)
54+
cache.get(3)
55+
cache.put(4, "four")
56+
57+
assert(cache.get(1).contains("one"))
58+
assert(cache.get(2).isEmpty)
59+
assert(cache.get(3).contains("three"))
60+
assert(cache.get(4).contains("four"))
61+
}
62+
63+
"allow invalidating of values" in {
64+
val cache = new LRUCache[Int, String](3)
65+
cache.put(1, "one")
66+
cache.put(2, "two")
67+
cache.put(3, "three")
68+
cache.remove(3)
69+
cache.remove(4)
70+
cache.get(1)
71+
cache.get(3)
72+
cache.put(4, "four")
73+
74+
assert(cache.containsKey(1))
75+
assert(!cache.containsKey(8))
76+
assert(cache(1) == "one")
77+
assert(cache(2) == "two")
78+
assert(cache(3) == null)
79+
assert(cache(4) == "four")
80+
}
81+
82+
"return the cache as a map" in {
83+
val cache = new LRUCache[Int, String](3)
84+
cache.put(1, "one")
85+
cache.put(2, "two")
86+
cache.put(3, "three")
87+
88+
assert(cache.getMap.size == 3)
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)