|
31 | 31 | import org.elasticsearch.common.settings.Settings; |
32 | 32 | import org.elasticsearch.common.unit.ByteSizeValue; |
33 | 33 | import org.elasticsearch.common.util.concurrent.ConcurrentCollections; |
34 | | -import org.elasticsearch.common.util.concurrent.ReleasableLock; |
35 | 34 | import org.elasticsearch.core.Nullable; |
36 | 35 | import org.elasticsearch.core.TimeValue; |
37 | 36 | import org.elasticsearch.lucene.util.BitSets; |
|
52 | 51 | import java.util.concurrent.TimeUnit; |
53 | 52 | import java.util.concurrent.atomic.AtomicLong; |
54 | 53 | import java.util.concurrent.atomic.LongAdder; |
55 | | -import java.util.concurrent.locks.ReentrantReadWriteLock; |
56 | 54 | import java.util.function.LongSupplier; |
57 | 55 |
|
58 | 56 | /** |
@@ -107,16 +105,6 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen |
107 | 105 |
|
108 | 106 | private static final BitSet NULL_MARKER = new FixedBitSet(0); |
109 | 107 |
|
110 | | - /** |
111 | | - * When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}. |
112 | | - * We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the |
113 | | - * {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}. |
114 | | - * The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately |
115 | | - * re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache} |
116 | | - * but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}. |
117 | | - */ |
118 | | - private final ReleasableLock cacheEvictionLock; |
119 | | - private final ReleasableLock cacheModificationLock; |
120 | 108 | private final ExecutorService cleanupExecutor; |
121 | 109 |
|
122 | 110 | private final long maxWeightBytes; |
@@ -144,9 +132,6 @@ public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) { |
144 | 132 | */ |
145 | 133 | // visible for testing |
146 | 134 | DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor, LongSupplier relativeNanoTimeProvider) { |
147 | | - final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); |
148 | | - this.cacheEvictionLock = new ReleasableLock(readWriteLock.writeLock()); |
149 | | - this.cacheModificationLock = new ReleasableLock(readWriteLock.readLock()); |
150 | 135 | this.cleanupExecutor = cleanupExecutor; |
151 | 136 |
|
152 | 137 | final TimeValue ttl = CACHE_TTL_SETTING.get(settings); |
@@ -183,25 +168,19 @@ private void onCacheEviction(RemovalNotification<BitsetCacheKey, BitSet> notific |
183 | 168 | // If the cacheKey isn't in the lookup map, then there's nothing to synchronize |
184 | 169 | return; |
185 | 170 | } |
186 | | - // We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is |
187 | | - // simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore |
188 | | - // holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write lock ("eviction"), but we |
189 | | - // need to acquire that lock here. |
190 | 171 | cleanupExecutor.submit(() -> { |
191 | | - try (ReleasableLock ignoredLock = cacheEvictionLock.acquire()) { |
192 | | - // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check |
193 | | - if (bitsetCache.get(cacheKey) == null) { |
194 | | - // key is no longer in the cache, make sure it is no longer in the lookup map either. |
195 | | - keysByIndex.compute(indexKey, (ignored, keys) -> { |
196 | | - if (keys != null) { |
197 | | - keys.remove(cacheKey); |
198 | | - if (keys.isEmpty()) { |
199 | | - keys = null; |
200 | | - } |
| 172 | + // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check |
| 173 | + if (bitsetCache.get(cacheKey) == null) { |
| 174 | + // key is no longer in the cache, make sure it is no longer in the lookup map either. |
| 175 | + keysByIndex.compute(indexKey, (ignored, keys) -> { |
| 176 | + if (keys != null) { |
| 177 | + keys.remove(cacheKey); |
| 178 | + if (keys.isEmpty()) { |
| 179 | + keys = null; |
201 | 180 | } |
202 | | - return keys; |
203 | | - }); |
204 | | - } |
| 181 | + } |
| 182 | + return keys; |
| 183 | + }); |
205 | 184 | } |
206 | 185 | }); |
207 | 186 | } |
@@ -255,48 +234,46 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro |
255 | 234 | final IndexReader.CacheKey indexKey = coreCacheHelper.getKey(); |
256 | 235 | final BitsetCacheKey cacheKey = new BitsetCacheKey(indexKey, query); |
257 | 236 |
|
258 | | - try (ReleasableLock ignoredLock = cacheModificationLock.acquire()) { |
259 | | - final boolean[] cacheKeyWasPresent = new boolean[] { true }; |
260 | | - final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> { |
261 | | - cacheKeyWasPresent[0] = false; |
262 | | - // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees. |
263 | | - keysByIndex.compute(indexKey, (ignore2, keys) -> { |
264 | | - if (keys == null) { |
265 | | - keys = ConcurrentCollections.newConcurrentSet(); |
266 | | - } |
267 | | - keys.add(cacheKey); |
268 | | - return keys; |
269 | | - }); |
270 | | - final BitSet result = computeBitSet(query, context); |
271 | | - if (result == null) { |
272 | | - // A cache loader is not allowed to return null, return a marker object instead. |
273 | | - return NULL_MARKER; |
274 | | - } |
275 | | - final long bitSetBytes = result.ramBytesUsed(); |
276 | | - if (bitSetBytes > this.maxWeightBytes) { |
277 | | - logger.warn( |
278 | | - "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;" |
279 | | - + " this object cannot be cached and will need to be rebuilt for each use;" |
280 | | - + " consider increasing the value of [{}]", |
281 | | - bitSetBytes, |
282 | | - maxWeightBytes, |
283 | | - CACHE_SIZE_SETTING.getKey() |
284 | | - ); |
285 | | - } else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) { |
286 | | - maybeLogCacheFullWarning(); |
| 237 | + final boolean[] cacheKeyWasPresent = new boolean[] { true }; |
| 238 | + final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> { |
| 239 | + cacheKeyWasPresent[0] = false; |
| 240 | + // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees. |
| 241 | + keysByIndex.compute(indexKey, (ignore2, keys) -> { |
| 242 | + if (keys == null) { |
| 243 | + keys = ConcurrentCollections.newConcurrentSet(); |
287 | 244 | } |
288 | | - return result; |
| 245 | + keys.add(cacheKey); |
| 246 | + return keys; |
289 | 247 | }); |
290 | | - if (cacheKeyWasPresent[0]) { |
291 | | - hitsTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart); |
292 | | - } else { |
293 | | - missesTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart); |
| 248 | + final BitSet result = computeBitSet(query, context); |
| 249 | + if (result == null) { |
| 250 | + // A cache loader is not allowed to return null, return a marker object instead. |
| 251 | + return NULL_MARKER; |
294 | 252 | } |
295 | | - if (bitSet == NULL_MARKER) { |
296 | | - return null; |
297 | | - } else { |
298 | | - return bitSet; |
| 253 | + final long bitSetBytes = result.ramBytesUsed(); |
| 254 | + if (bitSetBytes > this.maxWeightBytes) { |
| 255 | + logger.warn( |
| 256 | + "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;" |
| 257 | + + " this object cannot be cached and will need to be rebuilt for each use;" |
| 258 | + + " consider increasing the value of [{}]", |
| 259 | + bitSetBytes, |
| 260 | + maxWeightBytes, |
| 261 | + CACHE_SIZE_SETTING.getKey() |
| 262 | + ); |
| 263 | + } else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) { |
| 264 | + maybeLogCacheFullWarning(); |
299 | 265 | } |
| 266 | + return result; |
| 267 | + }); |
| 268 | + if (cacheKeyWasPresent[0]) { |
| 269 | + hitsTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart); |
| 270 | + } else { |
| 271 | + missesTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart); |
| 272 | + } |
| 273 | + if (bitSet == NULL_MARKER) { |
| 274 | + return null; |
| 275 | + } else { |
| 276 | + return bitSet; |
300 | 277 | } |
301 | 278 | } |
302 | 279 |
|
|
0 commit comments