31
31
import org .elasticsearch .common .settings .Settings ;
32
32
import org .elasticsearch .common .unit .ByteSizeValue ;
33
33
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
34
- import org .elasticsearch .common .util .concurrent .ReleasableLock ;
35
34
import org .elasticsearch .core .Nullable ;
36
35
import org .elasticsearch .core .TimeValue ;
37
36
import org .elasticsearch .lucene .util .BitSets ;
38
37
import org .elasticsearch .lucene .util .MatchAllBitSet ;
39
- import org .elasticsearch .threadpool .ThreadPool ;
40
38
41
39
import java .io .Closeable ;
42
40
import java .io .IOException ;
43
41
import java .util .List ;
44
42
import java .util .Map ;
45
43
import java .util .Objects ;
46
- import java .util .Optional ;
47
44
import java .util .Set ;
48
45
import java .util .concurrent .ConcurrentHashMap ;
49
46
import java .util .concurrent .ExecutionException ;
50
- import java .util .concurrent .ExecutorService ;
51
47
import java .util .concurrent .TimeUnit ;
52
48
import java .util .concurrent .atomic .AtomicLong ;
53
- import java .util .concurrent .locks .ReentrantReadWriteLock ;
54
49
55
50
/**
56
51
* This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}.
81
76
*/
82
77
public final class DocumentSubsetBitsetCache implements IndexReader .ClosedListener , Closeable , Accountable {
83
78
79
+ private static final Logger logger = LogManager .getLogger (DocumentSubsetBitsetCache .class );
80
+
84
81
/**
85
82
* The TTL defaults to 2 hours. We default to a large cache size ({@link #CACHE_SIZE_SETTING}), and aggressively
86
83
* expire unused entries so that the cache does not hold on to memory unnecessarily.
@@ -102,40 +99,15 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
102
99
103
100
private static final BitSet NULL_MARKER = new FixedBitSet (0 );
104
101
105
- private static final Logger logger = LogManager .getLogger (DocumentSubsetBitsetCache .class );
106
-
107
- /**
108
- * When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
109
- * We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
110
- * {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
111
- * The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
112
- * re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
113
- * but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
114
- */
115
- private final ReleasableLock cacheEvictionLock ;
116
- private final ReleasableLock cacheModificationLock ;
117
- private final ExecutorService cleanupExecutor ;
118
-
119
102
private final long maxWeightBytes ;
120
103
private final Cache <BitsetCacheKey , BitSet > bitsetCache ;
121
104
private final Map <IndexReader .CacheKey , Set <BitsetCacheKey >> keysByIndex ;
122
105
private final AtomicLong cacheFullWarningTime ;
123
106
124
- public DocumentSubsetBitsetCache (Settings settings , ThreadPool threadPool ) {
125
- this (settings , threadPool .executor (ThreadPool .Names .GENERIC ));
126
- }
127
-
128
107
/**
129
108
* @param settings The global settings object for this node
130
- * @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
131
- * it is sometimes necessary to run an asynchronous task to synchronize the internal state.
132
109
*/
133
- protected DocumentSubsetBitsetCache (Settings settings , ExecutorService cleanupExecutor ) {
134
- final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
135
- this .cacheEvictionLock = new ReleasableLock (readWriteLock .writeLock ());
136
- this .cacheModificationLock = new ReleasableLock (readWriteLock .readLock ());
137
- this .cleanupExecutor = cleanupExecutor ;
138
-
110
+ public DocumentSubsetBitsetCache (Settings settings ) {
139
111
final TimeValue ttl = CACHE_TTL_SETTING .get (settings );
140
112
this .maxWeightBytes = CACHE_SIZE_SETTING .get (settings ).getBytes ();
141
113
this .bitsetCache = CacheBuilder .<BitsetCacheKey , BitSet >builder ()
@@ -150,8 +122,8 @@ protected DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupEx
150
122
}
151
123
152
124
@ Override
153
- public void onClose (IndexReader .CacheKey ownerCoreCacheKey ) {
154
- final Set <BitsetCacheKey > keys = keysByIndex .remove (ownerCoreCacheKey );
125
+ public void onClose (IndexReader .CacheKey indexKey ) {
126
+ final Set <BitsetCacheKey > keys = keysByIndex .remove (indexKey );
155
127
if (keys != null ) {
156
128
// Because this Set has been removed from the map, and the only update to the set is performed in a
157
129
// Map#compute call, it should not be possible to get a concurrent modification here.
@@ -163,24 +135,17 @@ public void onClose(IndexReader.CacheKey ownerCoreCacheKey) {
163
135
* Cleanup (synchronize) the internal state when an object is removed from the primary cache
164
136
*/
165
137
private void onCacheEviction (RemovalNotification <BitsetCacheKey , BitSet > notification ) {
166
- final BitsetCacheKey bitsetKey = notification .getKey ();
167
- final IndexReader .CacheKey indexKey = bitsetKey .index ;
168
- if (keysByIndex .getOrDefault (indexKey , Set .of ()).contains (bitsetKey ) == false ) {
169
- // If the bitsetKey isn't in the lookup map, then there's nothing to synchronize
170
- return ;
171
- }
172
- // We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
173
- // simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
174
- // holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write ("eviction") lock, but we
175
- // need to acquire that lock here.
176
- cleanupExecutor .submit (() -> {
177
- try (ReleasableLock ignored = cacheEvictionLock .acquire ()) {
178
- // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
179
- if (bitsetCache .get (bitsetKey ) == null ) {
180
- // key is no longer in the cache, make sure it is no longer in the lookup map either.
181
- Optional .ofNullable (keysByIndex .get (indexKey )).ifPresent (set -> set .remove (bitsetKey ));
182
- }
183
- }
138
+ final BitsetCacheKey cacheKey = notification .getKey ();
139
+ final IndexReader .CacheKey indexKey = cacheKey .indexKey ;
140
+ // the key is *probably* no longer in the cache, so make sure it is no longer in the lookup map.
141
+ // note: rather than locking (which destroys our throughput), we're erring on the side of tidying the keysByIndex
142
+ // structure even if some other racing thread has already added a new bitset into the cache for this same key.
143
+ // the keysByIndex structure is used in onClose (our notification from lucene that a segment has become inaccessible),
144
+ // so we might end up failing to *eagerly* invalidate a bitset -- the consequence of that would be temporarily higher
145
+ // memory use (the bitset will not be accessed, and it will still be invalidated eventually for size or ttl reasons).
146
+ keysByIndex .computeIfPresent (indexKey , (ignored , keys ) -> {
147
+ keys .remove (cacheKey );
148
+ return keys .isEmpty () ? null : keys ;
184
149
});
185
150
}
186
151
@@ -231,41 +196,39 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
231
196
final IndexReader .CacheKey indexKey = coreCacheHelper .getKey ();
232
197
final BitsetCacheKey cacheKey = new BitsetCacheKey (indexKey , query );
233
198
234
- try (ReleasableLock ignored = cacheModificationLock .acquire ()) {
235
- final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
236
- // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
237
- keysByIndex .compute (indexKey , (ignore2 , set ) -> {
238
- if (set == null ) {
239
- set = ConcurrentCollections .newConcurrentSet ();
240
- }
241
- set .add (cacheKey );
242
- return set ;
243
- });
244
- final BitSet result = computeBitSet (query , context );
245
- if (result == null ) {
246
- // A cache loader is not allowed to return null, return a marker object instead.
247
- return NULL_MARKER ;
248
- }
249
- final long bitSetBytes = result .ramBytesUsed ();
250
- if (bitSetBytes > this .maxWeightBytes ) {
251
- logger .warn (
252
- "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
253
- + " this object cannot be cached and will need to be rebuilt for each use;"
254
- + " consider increasing the value of [{}]" ,
255
- bitSetBytes ,
256
- maxWeightBytes ,
257
- CACHE_SIZE_SETTING .getKey ()
258
- );
259
- } else if (bitSetBytes + bitsetCache .weight () > maxWeightBytes ) {
260
- maybeLogCacheFullWarning ();
199
+ final BitSet bitSet = bitsetCache .computeIfAbsent (cacheKey , ignore1 -> {
200
+ // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
201
+ keysByIndex .compute (indexKey , (ignore2 , keys ) -> {
202
+ if (keys == null ) {
203
+ keys = ConcurrentCollections .newConcurrentSet ();
261
204
}
262
- return result ;
205
+ keys .add (cacheKey );
206
+ return keys ;
263
207
});
264
- if ( bitSet == NULL_MARKER ) {
265
- return null ;
266
- } else {
267
- return bitSet ;
208
+ final BitSet result = computeBitSet ( query , context );
209
+ if ( result == null ) {
210
+ // A cache loader is not allowed to return null, return a marker object instead.
211
+ return NULL_MARKER ;
268
212
}
213
+ final long bitSetBytes = result .ramBytesUsed ();
214
+ if (bitSetBytes > this .maxWeightBytes ) {
215
+ logger .warn (
216
+ "built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
217
+ + " this object cannot be cached and will need to be rebuilt for each use;"
218
+ + " consider increasing the value of [{}]" ,
219
+ bitSetBytes ,
220
+ maxWeightBytes ,
221
+ CACHE_SIZE_SETTING .getKey ()
222
+ );
223
+ } else if (bitSetBytes + bitsetCache .weight () > maxWeightBytes ) {
224
+ maybeLogCacheFullWarning ();
225
+ }
226
+ return result ;
227
+ });
228
+ if (bitSet == NULL_MARKER ) {
229
+ return null ;
230
+ } else {
231
+ return bitSet ;
269
232
}
270
233
}
271
234
@@ -323,11 +286,11 @@ public Map<String, Object> usageStats() {
323
286
}
324
287
325
288
private static class BitsetCacheKey {
326
- final IndexReader .CacheKey index ;
289
+ final IndexReader .CacheKey indexKey ;
327
290
final Query query ;
328
291
329
- private BitsetCacheKey (IndexReader .CacheKey index , Query query ) {
330
- this .index = index ;
292
+ private BitsetCacheKey (IndexReader .CacheKey indexKey , Query query ) {
293
+ this .indexKey = indexKey ;
331
294
this .query = query ;
332
295
}
333
296
@@ -340,41 +303,59 @@ public boolean equals(Object other) {
340
303
return false ;
341
304
}
342
305
final BitsetCacheKey that = (BitsetCacheKey ) other ;
343
- return Objects .equals (this .index , that .index ) && Objects .equals (this .query , that .query );
306
+ return Objects .equals (this .indexKey , that .indexKey ) && Objects .equals (this .query , that .query );
344
307
}
345
308
346
309
@ Override
347
310
public int hashCode () {
348
- return Objects .hash (index , query );
311
+ return Objects .hash (indexKey , query );
349
312
}
350
313
351
314
@ Override
352
315
public String toString () {
353
- return getClass ().getSimpleName () + "(" + index + "," + query + ")" ;
316
+ return getClass ().getSimpleName () + "(" + indexKey + "," + query + ")" ;
354
317
}
355
318
}
356
319
357
320
/**
358
- * This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
359
- * another. This method is only called by tests .
321
+ * This test-only method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent
322
+ * with one another .
360
323
*/
324
+ // visible for testing
361
325
void verifyInternalConsistency () {
362
- this .bitsetCache .keys ().forEach (bck -> {
363
- final Set <BitsetCacheKey > set = this .keysByIndex .get (bck .index );
364
- if (set == null ) {
365
- throw new IllegalStateException (
366
- "Key [" + bck + "] is in the cache, but there is no entry for [" + bck .index + "] in the lookup map"
367
- );
368
- }
369
- if (set .contains (bck ) == false ) {
326
+ verifyInternalConsistencyCacheToKeys ();
327
+ verifyInternalConsistencyKeysToCache ();
328
+ }
329
+
330
+ /**
331
+ * This test-only method iterates over the {@link #bitsetCache} and checks that {@link #keysByIndex} is consistent with it.
332
+ */
333
+ // visible for testing
334
+ void verifyInternalConsistencyCacheToKeys () {
335
+ bitsetCache .keys ().forEach (cacheKey -> {
336
+ final Set <BitsetCacheKey > keys = keysByIndex .get (cacheKey .indexKey );
337
+ if (keys == null || keys .contains (cacheKey ) == false ) {
370
338
throw new IllegalStateException (
371
- "Key [" + bck + "] is in the cache, but the lookup entry for [" + bck . index + "] does not contain that key"
339
+ "Key [" + cacheKey + "] is in the cache, but the lookup entry for [" + cacheKey . indexKey + "] does not contain that key"
372
340
);
373
341
}
374
342
});
375
- this .keysByIndex .values ().stream ().flatMap (Set ::stream ).forEach (bck -> {
376
- if (this .bitsetCache .get (bck ) == null ) {
377
- throw new IllegalStateException ("Key [" + bck + "] is in the lookup map, but is not in the cache" );
343
+ }
344
+
345
+ /**
346
+ * This test-only method iterates over the {@link #keysByIndex} and checks that {@link #bitsetCache} is consistent with it.
347
+ */
348
+ // visible for testing
349
+ void verifyInternalConsistencyKeysToCache () {
350
+ keysByIndex .forEach ((indexKey , keys ) -> {
351
+ if (keys == null || keys .isEmpty ()) {
352
+ throw new IllegalStateException ("The lookup entry for [" + indexKey + "] is null or empty" );
353
+ } else {
354
+ keys .forEach (cacheKey -> {
355
+ if (bitsetCache .get (cacheKey ) == null ) {
356
+ throw new IllegalStateException ("Key [" + cacheKey + "] is in the lookup map, but is not in the cache" );
357
+ }
358
+ });
378
359
}
379
360
});
380
361
}
0 commit comments