3131import org .elasticsearch .common .settings .Settings ;
3232import org .elasticsearch .common .unit .ByteSizeValue ;
3333import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
34- import org .elasticsearch .common .util .concurrent .ReleasableLock ;
3534import org .elasticsearch .core .Nullable ;
3635import org .elasticsearch .core .TimeValue ;
3736import org .elasticsearch .lucene .util .BitSets ;
3837import org .elasticsearch .lucene .util .MatchAllBitSet ;
39- import org .elasticsearch .threadpool .ThreadPool ;
4038
4139import java .io .Closeable ;
4240import java .io .IOException ;
4543import java .util .List ;
4644import java .util .Map ;
4745import java .util .Objects ;
48- import java .util .Optional ;
4946import java .util .Set ;
5047import java .util .concurrent .ConcurrentHashMap ;
5148import java .util .concurrent .ExecutionException ;
52- import java .util .concurrent .ExecutorService ;
5349import java .util .concurrent .TimeUnit ;
5450import java .util .concurrent .atomic .AtomicLong ;
5551import java .util .concurrent .atomic .LongAdder ;
56- import java .util .concurrent .locks .ReentrantReadWriteLock ;
5752import java .util .function .LongSupplier ;
5853
5954/**
@@ -108,18 +103,6 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
108103
109104 private static final BitSet NULL_MARKER = new FixedBitSet (0 );
110105
111- /**
112- * When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
113- * We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
114- * {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
115- * The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
116- * re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
117- * but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
118- */
119- private final ReleasableLock cacheEvictionLock ;
120- private final ReleasableLock cacheModificationLock ;
121- private final ExecutorService cleanupExecutor ;
122-
123106 private final long maxWeightBytes ;
124107 private final Cache <BitsetCacheKey , BitSet > bitsetCache ;
125108 private final Map <IndexReader .CacheKey , Set <BitsetCacheKey >> keysByIndex ;
@@ -128,28 +111,16 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
128111 private final LongAdder hitsTimeInNanos = new LongAdder ();
129112 private final LongAdder missesTimeInNanos = new LongAdder ();
130113
131- public DocumentSubsetBitsetCache (Settings settings , ThreadPool threadPool ) {
132- this (settings , threadPool .executor (ThreadPool .Names .GENERIC ));
133- }
134-
135- // visible for testing
136- DocumentSubsetBitsetCache (Settings settings , ExecutorService cleanupExecutor ) {
137- this (settings , cleanupExecutor , System ::nanoTime );
114+ public DocumentSubsetBitsetCache (Settings settings ) {
115+ this (settings , System ::nanoTime );
138116 }
139117
140118 /**
141119 * @param settings The global settings object for this node
142- * @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
143- * it is sometimes necessary to run an asynchronous task to synchronize the internal state.
144120 * @param relativeNanoTimeProvider Provider of nanos for code that needs to measure relative time.
145121 */
146122 // visible for testing
147- DocumentSubsetBitsetCache (Settings settings , ExecutorService cleanupExecutor , LongSupplier relativeNanoTimeProvider ) {
148- final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
149- this .cacheEvictionLock = new ReleasableLock (readWriteLock .writeLock ());
150- this .cacheModificationLock = new ReleasableLock (readWriteLock .readLock ());
151- this .cleanupExecutor = cleanupExecutor ;
152-
123+ DocumentSubsetBitsetCache (Settings settings , LongSupplier relativeNanoTimeProvider ) {
153124 final TimeValue ttl = CACHE_TTL_SETTING .get (settings );
154125 this .maxWeightBytes = CACHE_SIZE_SETTING .get (settings ).getBytes ();
155126 this .bitsetCache = CacheBuilder .<BitsetCacheKey , BitSet >builder ()
@@ -180,22 +151,15 @@ public void onClose(IndexReader.CacheKey indexKey) {
180151 private void onCacheEviction (RemovalNotification <BitsetCacheKey , BitSet > notification ) {
181152 final BitsetCacheKey cacheKey = notification .getKey ();
182153 final IndexReader .CacheKey indexKey = cacheKey .indexKey ;
183- if (keysByIndex .getOrDefault (indexKey , Set .of ()).contains (cacheKey ) == false ) {
184- // If the cacheKey isn't in the lookup map, then there's nothing to synchronize
185- return ;
186- }
187- // We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
188- // simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
189- // holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write lock ("eviction"), but we
190- // need to acquire that lock here.
191- cleanupExecutor .submit (() -> {
192- try (ReleasableLock ignored = cacheEvictionLock .acquire ()) {
193- // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
194- if (bitsetCache .get (cacheKey ) == null ) {
195- // key is no longer in the cache, make sure it is no longer in the lookup map either.
196- Optional .ofNullable (keysByIndex .get (indexKey )).ifPresent (set -> set .remove (cacheKey ));
197- }
198- }
154+ // the key is *probably* no longer in the cache, so make sure it is no longer in the lookup map.
155+ // note: rather than locking (which destroys our throughput), we're erring on the side of tidying the keysByIndex
156+ // structure even if some other racing thread has already added a new bitset into the cache for this same key.
157+ // the keysByIndex structure is used in onClose (our notification from lucene that a segment has become inaccessible),
158+ // so we might end up failing to *eagerly* invalidate a bitset -- the consequence of that would be temporarily higher
159+ // memory use (the bitset will not be accessed, and it will still be invalidated eventually for size or ttl reasons).
160+ keysByIndex .computeIfPresent (indexKey , (ignored , keys ) -> {
161+ keys .remove (cacheKey );
162+ return keys .isEmpty () ? null : keys ;
199163 });
200164 }
201165
@@ -248,48 +212,46 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
248212 final IndexReader .CacheKey indexKey = coreCacheHelper .getKey ();
249213 final BitsetCacheKey cacheKey = new BitsetCacheKey (indexKey , query );
250214
251- try (ReleasableLock ignored = cacheModificationLock .acquire ()) {
252- final boolean [] cacheKeyWasPresent = new boolean [] { true };
253- final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
254- cacheKeyWasPresent [0 ] = false ;
255- // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
256- keysByIndex .compute (indexKey , (ignore2 , set ) -> {
257- if (set == null ) {
258- set = ConcurrentCollections .newConcurrentSet ();
259- }
260- set .add (cacheKey );
261- return set ;
262- });
263- final BitSet result = computeBitSet (query , context );
264- if (result == null ) {
265- // A cache loader is not allowed to return null, return a marker object instead.
266- return NULL_MARKER ;
215+ final boolean [] cacheKeyWasPresent = new boolean [] { true };
216+ final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
217+ cacheKeyWasPresent [0 ] = false ;
218+ // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
219+ keysByIndex .compute (indexKey , (ignore2 , keys ) -> {
220+ if (keys == null ) {
221+ keys = ConcurrentCollections .newConcurrentSet ();
267222 }
268- final long bitSetBytes = result .ramBytesUsed ();
269- if (bitSetBytes > this .maxWeightBytes ) {
270- logger .warn (
271- "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
272- + " this object cannot be cached and will need to be rebuilt for each use;"
273- + " consider increasing the value of [{}]" ,
274- bitSetBytes ,
275- maxWeightBytes ,
276- CACHE_SIZE_SETTING .getKey ()
277- );
278- } else if (bitSetBytes + bitsetCache .weight () > maxWeightBytes ) {
279- maybeLogCacheFullWarning ();
280- }
281- return result ;
223+ keys .add (cacheKey );
224+ return keys ;
282225 });
283- if ( cacheKeyWasPresent [ 0 ]) {
284- hitsTimeInNanos . add ( relativeNanoTimeProvider . getAsLong () - cacheStart );
285- } else {
286- missesTimeInNanos . add ( relativeNanoTimeProvider . getAsLong () - cacheStart ) ;
226+ final BitSet result = computeBitSet ( query , context );
227+ if ( result == null ) {
228+ // A cache loader is not allowed to return null, return a marker object instead.
229+ return NULL_MARKER ;
287230 }
288- if (bitSet == NULL_MARKER ) {
289- return null ;
290- } else {
291- return bitSet ;
231+ final long bitSetBytes = result .ramBytesUsed ();
232+ if (bitSetBytes > this .maxWeightBytes ) {
233+ logger .warn (
234+ "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
235+ + " this object cannot be cached and will need to be rebuilt for each use;"
236+ + " consider increasing the value of [{}]" ,
237+ bitSetBytes ,
238+ maxWeightBytes ,
239+ CACHE_SIZE_SETTING .getKey ()
240+ );
241+ } else if (bitSetBytes + bitsetCache .weight () > maxWeightBytes ) {
242+ maybeLogCacheFullWarning ();
292243 }
244+ return result ;
245+ });
246+ if (cacheKeyWasPresent [0 ]) {
247+ hitsTimeInNanos .add (relativeNanoTimeProvider .getAsLong () - cacheStart );
248+ } else {
249+ missesTimeInNanos .add (relativeNanoTimeProvider .getAsLong () - cacheStart );
250+ }
251+ if (bitSet == NULL_MARKER ) {
252+ return null ;
253+ } else {
254+ return bitSet ;
293255 }
294256 }
295257
@@ -402,26 +364,44 @@ public String toString() {
402364 }
403365
404366 /**
405- * This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
406- * another. This method is only called by tests .
367+ * This test-only method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent
368+ * with one another .
407369 */
370+ // visible for testing
408371 void verifyInternalConsistency () {
409- this .bitsetCache .keys ().forEach (bck -> {
410- final Set <BitsetCacheKey > set = this .keysByIndex .get (bck .indexKey );
411- if (set == null ) {
412- throw new IllegalStateException (
413- "Key [" + bck + "] is in the cache, but there is no entry for [" + bck .indexKey + "] in the lookup map"
414- );
415- }
416- if (set .contains (bck ) == false ) {
372+ verifyInternalConsistencyCacheToKeys ();
373+ verifyInternalConsistencyKeysToCache ();
374+ }
375+
376+ /**
377+ * This test-only method iterates over the {@link #bitsetCache} and checks that {@link #keysByIndex} is consistent with it.
378+ */
379+ // visible for testing
380+ void verifyInternalConsistencyCacheToKeys () {
381+ bitsetCache .keys ().forEach (cacheKey -> {
382+ final Set <BitsetCacheKey > keys = keysByIndex .get (cacheKey .indexKey );
383+ if (keys == null || keys .contains (cacheKey ) == false ) {
417384 throw new IllegalStateException (
418- "Key [" + bck + "] is in the cache, but the lookup entry for [" + bck .indexKey + "] does not contain that key"
385+ "Key [" + cacheKey + "] is in the cache, but the lookup entry for [" + cacheKey .indexKey + "] does not contain that key"
419386 );
420387 }
421388 });
422- this .keysByIndex .values ().stream ().flatMap (Set ::stream ).forEach (bck -> {
423- if (this .bitsetCache .get (bck ) == null ) {
424- throw new IllegalStateException ("Key [" + bck + "] is in the lookup map, but is not in the cache" );
389+ }
390+
391+ /**
392+ * This test-only method iterates over the {@link #keysByIndex} and checks that {@link #bitsetCache} is consistent with it.
393+ */
394+ // visible for testing
395+ void verifyInternalConsistencyKeysToCache () {
396+ keysByIndex .forEach ((indexKey , keys ) -> {
397+ if (keys == null || keys .isEmpty ()) {
398+ throw new IllegalStateException ("The lookup entry for [" + indexKey + "] is null or empty" );
399+ } else {
400+ keys .forEach (cacheKey -> {
401+ if (bitsetCache .get (cacheKey ) == null ) {
402+ throw new IllegalStateException ("Key [" + cacheKey + "] is in the lookup map, but is not in the cache" );
403+ }
404+ });
425405 }
426406 });
427407 }
0 commit comments