3131import org .elasticsearch .common .settings .Settings ;
3232import org .elasticsearch .common .unit .ByteSizeValue ;
3333import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
34- import org .elasticsearch .common .util .concurrent .ReleasableLock ;
3534import org .elasticsearch .core .Nullable ;
3635import org .elasticsearch .core .TimeValue ;
3736import org .elasticsearch .lucene .util .BitSets ;
3837import org .elasticsearch .lucene .util .MatchAllBitSet ;
39- import org .elasticsearch .threadpool .ThreadPool ;
4038
4139import java .io .Closeable ;
4240import java .io .IOException ;
4341import java .util .List ;
4442import java .util .Map ;
4543import java .util .Objects ;
46- import java .util .Optional ;
4744import java .util .Set ;
4845import java .util .concurrent .ConcurrentHashMap ;
4946import java .util .concurrent .ExecutionException ;
50- import java .util .concurrent .ExecutorService ;
5147import java .util .concurrent .TimeUnit ;
5248import java .util .concurrent .atomic .AtomicLong ;
53- import java .util .concurrent .locks .ReentrantReadWriteLock ;
5449
5550/**
5651 * This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}.
8176 */
8277public final class DocumentSubsetBitsetCache implements IndexReader .ClosedListener , Closeable , Accountable {
8378
79+ private static final Logger logger = LogManager .getLogger (DocumentSubsetBitsetCache .class );
80+
8481 /**
8582 * The TTL defaults to 2 hours. We default to a large cache size ({@link #CACHE_SIZE_SETTING}), and aggressively
8683 * expire unused entries so that the cache does not hold on to memory unnecessarily.
@@ -102,40 +99,15 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
10299
103100 private static final BitSet NULL_MARKER = new FixedBitSet (0 );
104101
105- private static final Logger logger = LogManager .getLogger (DocumentSubsetBitsetCache .class );
106-
107- /**
108- * When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
109- * We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
110- * {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
111- * The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
112- * re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
113- * but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
114- */
115- private final ReleasableLock cacheEvictionLock ;
116- private final ReleasableLock cacheModificationLock ;
117- private final ExecutorService cleanupExecutor ;
118-
119102 private final long maxWeightBytes ;
120103 private final Cache <BitsetCacheKey , BitSet > bitsetCache ;
121104 private final Map <IndexReader .CacheKey , Set <BitsetCacheKey >> keysByIndex ;
122105 private final AtomicLong cacheFullWarningTime ;
123106
124- public DocumentSubsetBitsetCache (Settings settings , ThreadPool threadPool ) {
125- this (settings , threadPool .executor (ThreadPool .Names .GENERIC ));
126- }
127-
128107 /**
129108 * @param settings The global settings object for this node
130- * @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
131- * it is sometimes necessary to run an asynchronous task to synchronize the internal state.
132109 */
133- protected DocumentSubsetBitsetCache (Settings settings , ExecutorService cleanupExecutor ) {
134- final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
135- this .cacheEvictionLock = new ReleasableLock (readWriteLock .writeLock ());
136- this .cacheModificationLock = new ReleasableLock (readWriteLock .readLock ());
137- this .cleanupExecutor = cleanupExecutor ;
138-
110+ public DocumentSubsetBitsetCache (Settings settings ) {
139111 final TimeValue ttl = CACHE_TTL_SETTING .get (settings );
140112 this .maxWeightBytes = CACHE_SIZE_SETTING .get (settings ).getBytes ();
141113 this .bitsetCache = CacheBuilder .<BitsetCacheKey , BitSet >builder ()
@@ -150,8 +122,8 @@ protected DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupEx
150122 }
151123
152124 @ Override
153- public void onClose (IndexReader .CacheKey ownerCoreCacheKey ) {
154- final Set <BitsetCacheKey > keys = keysByIndex .remove (ownerCoreCacheKey );
125+ public void onClose (IndexReader .CacheKey indexKey ) {
126+ final Set <BitsetCacheKey > keys = keysByIndex .remove (indexKey );
155127 if (keys != null ) {
156128 // Because this Set has been removed from the map, and the only update to the set is performed in a
157129 // Map#compute call, it should not be possible to get a concurrent modification here.
@@ -163,24 +135,17 @@ public void onClose(IndexReader.CacheKey ownerCoreCacheKey) {
163135 * Cleanup (synchronize) the internal state when an object is removed from the primary cache
164136 */
165137 private void onCacheEviction (RemovalNotification <BitsetCacheKey , BitSet > notification ) {
166- final BitsetCacheKey bitsetKey = notification .getKey ();
167- final IndexReader .CacheKey indexKey = bitsetKey .index ;
168- if (keysByIndex .getOrDefault (indexKey , Set .of ()).contains (bitsetKey ) == false ) {
169- // If the bitsetKey isn't in the lookup map, then there's nothing to synchronize
170- return ;
171- }
172- // We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
173- // simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
174- // holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write ("eviction") lock, but we
175- // need to acquire that lock here.
176- cleanupExecutor .submit (() -> {
177- try (ReleasableLock ignored = cacheEvictionLock .acquire ()) {
178- // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
179- if (bitsetCache .get (bitsetKey ) == null ) {
180- // key is no longer in the cache, make sure it is no longer in the lookup map either.
181- Optional .ofNullable (keysByIndex .get (indexKey )).ifPresent (set -> set .remove (bitsetKey ));
182- }
183- }
138+ final BitsetCacheKey cacheKey = notification .getKey ();
139+ final IndexReader .CacheKey indexKey = cacheKey .indexKey ;
140+ // the key is *probably* no longer in the cache, so make sure it is no longer in the lookup map.
141+ // note: rather than locking (which destroys our throughput), we're erring on the side of tidying the keysByIndex
142+ // structure even if some other racing thread has already added a new bitset into the cache for this same key.
143+ // the keysByIndex structure is used in onClose (our notification from lucene that a segment has become inaccessible),
144+ // so we might end up failing to *eagerly* invalidate a bitset -- the consequence of that would be temporarily higher
145+ // memory use (the bitset will not be accessed, and it will still be invalidated eventually for size or ttl reasons).
146+ keysByIndex .computeIfPresent (indexKey , (ignored , keys ) -> {
147+ keys .remove (cacheKey );
148+ return keys .isEmpty () ? null : keys ;
184149 });
185150 }
186151
@@ -231,41 +196,39 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
231196 final IndexReader .CacheKey indexKey = coreCacheHelper .getKey ();
232197 final BitsetCacheKey cacheKey = new BitsetCacheKey (indexKey , query );
233198
234- try (ReleasableLock ignored = cacheModificationLock .acquire ()) {
235- final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
236- // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
237- keysByIndex .compute (indexKey , (ignore2 , set ) -> {
238- if (set == null ) {
239- set = ConcurrentCollections .newConcurrentSet ();
240- }
241- set .add (cacheKey );
242- return set ;
243- });
244- final BitSet result = computeBitSet (query , context );
245- if (result == null ) {
246- // A cache loader is not allowed to return null, return a marker object instead.
247- return NULL_MARKER ;
248- }
249- final long bitSetBytes = result .ramBytesUsed ();
250- if (bitSetBytes > this .maxWeightBytes ) {
251- logger .warn (
252- "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
253- + " this object cannot be cached and will need to be rebuilt for each use;"
254- + " consider increasing the value of [{}]" ,
255- bitSetBytes ,
256- maxWeightBytes ,
257- CACHE_SIZE_SETTING .getKey ()
258- );
259- } else if (bitSetBytes + bitsetCache .weight () > maxWeightBytes ) {
260- maybeLogCacheFullWarning ();
199+ final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
200+ // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
201+ keysByIndex .compute (indexKey , (ignore2 , keys ) -> {
202+ if (keys == null ) {
203+ keys = ConcurrentCollections .newConcurrentSet ();
261204 }
262- return result ;
205+ keys .add (cacheKey );
206+ return keys ;
263207 });
264- if ( bitSet == NULL_MARKER ) {
265- return null ;
266- } else {
267- return bitSet ;
208+ final BitSet result = computeBitSet ( query , context );
209+ if ( result == null ) {
210+ // A cache loader is not allowed to return null, return a marker object instead.
211+ return NULL_MARKER ;
268212 }
213+ final long bitSetBytes = result .ramBytesUsed ();
214+ if (bitSetBytes > this .maxWeightBytes ) {
215+ logger .warn (
216+ "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
217+ + " this object cannot be cached and will need to be rebuilt for each use;"
218+ + " consider increasing the value of [{}]" ,
219+ bitSetBytes ,
220+ maxWeightBytes ,
221+ CACHE_SIZE_SETTING .getKey ()
222+ );
223+ } else if (bitSetBytes + bitsetCache .weight () > maxWeightBytes ) {
224+ maybeLogCacheFullWarning ();
225+ }
226+ return result ;
227+ });
228+ if (bitSet == NULL_MARKER ) {
229+ return null ;
230+ } else {
231+ return bitSet ;
269232 }
270233 }
271234
@@ -323,11 +286,11 @@ public Map<String, Object> usageStats() {
323286 }
324287
325288 private static class BitsetCacheKey {
326- final IndexReader .CacheKey index ;
289+ final IndexReader .CacheKey indexKey ;
327290 final Query query ;
328291
329- private BitsetCacheKey (IndexReader .CacheKey index , Query query ) {
330- this .index = index ;
292+ private BitsetCacheKey (IndexReader .CacheKey indexKey , Query query ) {
293+ this .indexKey = indexKey ;
331294 this .query = query ;
332295 }
333296
@@ -340,41 +303,59 @@ public boolean equals(Object other) {
340303 return false ;
341304 }
342305 final BitsetCacheKey that = (BitsetCacheKey ) other ;
343- return Objects .equals (this .index , that .index ) && Objects .equals (this .query , that .query );
306+ return Objects .equals (this .indexKey , that .indexKey ) && Objects .equals (this .query , that .query );
344307 }
345308
346309 @ Override
347310 public int hashCode () {
348- return Objects .hash (index , query );
311+ return Objects .hash (indexKey , query );
349312 }
350313
351314 @ Override
352315 public String toString () {
353- return getClass ().getSimpleName () + "(" + index + "," + query + ")" ;
316+ return getClass ().getSimpleName () + "(" + indexKey + "," + query + ")" ;
354317 }
355318 }
356319
357320 /**
358- * This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
359- * another. This method is only called by tests .
321+ * This test-only method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent
322+ * with one another .
360323 */
324+ // visible for testing
361325 void verifyInternalConsistency () {
362- this .bitsetCache .keys ().forEach (bck -> {
363- final Set <BitsetCacheKey > set = this .keysByIndex .get (bck .index );
364- if (set == null ) {
365- throw new IllegalStateException (
366- "Key [" + bck + "] is in the cache, but there is no entry for [" + bck .index + "] in the lookup map"
367- );
368- }
369- if (set .contains (bck ) == false ) {
326+ verifyInternalConsistencyCacheToKeys ();
327+ verifyInternalConsistencyKeysToCache ();
328+ }
329+
330+ /**
331+ * This test-only method iterates over the {@link #bitsetCache} and checks that {@link #keysByIndex} is consistent with it.
332+ */
333+ // visible for testing
334+ void verifyInternalConsistencyCacheToKeys () {
335+ bitsetCache .keys ().forEach (cacheKey -> {
336+ final Set <BitsetCacheKey > keys = keysByIndex .get (cacheKey .indexKey );
337+ if (keys == null || keys .contains (cacheKey ) == false ) {
370338 throw new IllegalStateException (
371- "Key [" + bck + "] is in the cache, but the lookup entry for [" + bck . index + "] does not contain that key"
339+ "Key [" + cacheKey + "] is in the cache, but the lookup entry for [" + cacheKey . indexKey + "] does not contain that key"
372340 );
373341 }
374342 });
375- this .keysByIndex .values ().stream ().flatMap (Set ::stream ).forEach (bck -> {
376- if (this .bitsetCache .get (bck ) == null ) {
377- throw new IllegalStateException ("Key [" + bck + "] is in the lookup map, but is not in the cache" );
343+ }
344+
345+ /**
346+ * This test-only method iterates over the {@link #keysByIndex} and checks that {@link #bitsetCache} is consistent with it.
347+ */
348+ // visible for testing
349+ void verifyInternalConsistencyKeysToCache () {
350+ keysByIndex .forEach ((indexKey , keys ) -> {
351+ if (keys == null || keys .isEmpty ()) {
352+ throw new IllegalStateException ("The lookup entry for [" + indexKey + "] is null or empty" );
353+ } else {
354+ keys .forEach (cacheKey -> {
355+ if (bitsetCache .get (cacheKey ) == null ) {
356+ throw new IllegalStateException ("Key [" + cacheKey + "] is in the lookup map, but is not in the cache" );
357+ }
358+ });
378359 }
379360 });
380361 }
0 commit comments