Skip to content

Commit d6c1669

Browse files
authored
Merge branch 'master' into lh-improve-inflight-limiter
2 parents 302b671 + b6cfecc commit d6c1669

File tree

17 files changed

+671
-71
lines changed

17 files changed

+671
-71
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java

Lines changed: 86 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,39 @@ K getKey() {
106106
return localKey;
107107
}
108108

109-
V getValue(K key, boolean requireSameKeyInstance) {
109+
/**
110+
* Get the value associated with the key. Returns null if the key does not match the key.
111+
*
112+
* @param key the key to match
113+
* @return the value associated with the key, or null if the value has already been recycled or the key does not
114+
* match
115+
*/
116+
V getValue(K key) {
117+
return getValueInternal(key, false);
118+
}
119+
120+
/**
121+
* Get the value associated with the Map.Entry's key and value. Exact instance of the key is required to match.
122+
* @param entry the entry which contains the key and {@link EntryWrapper} value to get the value from
123+
* @return the value associated with the key, or null if the value has already been recycled or the key does not
124+
* exactly match the same instance
125+
*/
126+
static <K, V> V getValueMatchingMapEntry(Map.Entry<K, EntryWrapper<K, V>> entry) {
127+
return entry.getValue().getValueInternal(entry.getKey(), true);
128+
}
129+
130+
/**
131+
* Get the value associated with the key. Returns null if the key does not match the key associated with the
132+
* value.
133+
*
134+
* @param key the key to match
135+
* @param requireSameKeyInstance when true, the matching will be restricted to exactly the same instance of the
136+
* key as the one stored in the wrapper. This is used to avoid any races
137+
* when retrieving or removing the entries from the cache when the key and value
138+
* instances are available.
139+
* @return the value associated with the key, or null if the key does not match
140+
*/
141+
private V getValueInternal(K key, boolean requireSameKeyInstance) {
110142
long stamp = lock.tryOptimisticRead();
111143
K localKey = this.key;
112144
V localValue = this.value;
@@ -116,6 +148,11 @@ V getValue(K key, boolean requireSameKeyInstance) {
116148
localValue = this.value;
117149
lock.unlockRead(stamp);
118150
}
151+
152+
// check that the given key matches the key associated with the value in the entry
153+
// this is used to detect if the entry has already been recycled and contains another key
154+
// when requireSameKeyInstance is true, the key must be exactly the same instance as the one stored in the
155+
// entry to match
119156
if (localKey != key && (requireSameKeyInstance || localKey == null || !localKey.equals(key))) {
120157
return null;
121158
}
@@ -236,34 +273,45 @@ public boolean exists(Key key) {
236273
* The caller is responsible for releasing the reference.
237274
*/
238275
public Value get(Key key) {
239-
return getValue(key, entries.get(key), false);
276+
return getValueFromWrapper(key, entries.get(key));
240277
}
241278

242-
private Value getValue(Key key, EntryWrapper<Key, Value> valueWrapper, boolean requireSameKeyInstance) {
279+
private Value getValueFromWrapper(Key key, EntryWrapper<Key, Value> valueWrapper) {
243280
if (valueWrapper == null) {
244281
return null;
245282
} else {
246-
Value value = valueWrapper.getValue(key, requireSameKeyInstance);
247-
if (value == null) {
248-
// the wrapper has been recycled and contains another key
249-
return null;
250-
}
251-
try {
252-
value.retain();
253-
} catch (IllegalReferenceCountException e) {
254-
// Value was already deallocated
255-
return null;
256-
}
257-
// check that the value matches the key and that there's at least 2 references to it since
258-
// the cache should be holding one reference and a new reference was just added in this method
259-
if (value.refCnt() > 1 && value.matchesKey(key)) {
260-
return value;
261-
} else {
262-
// Value or IdentityWrapper was recycled and already contains another value
263-
// release the reference added in this method
264-
value.release();
265-
return null;
266-
}
283+
Value value = valueWrapper.getValue(key);
284+
return getRetainedValueMatchingKey(key, value);
285+
}
286+
}
287+
288+
private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry) {
289+
Value valueMatchingEntry = EntryWrapper.getValueMatchingMapEntry(entry);
290+
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
291+
}
292+
293+
// validates that the value matches the key and that the value has not been recycled
294+
// which are possible due to the lack of exclusive locks in the cache and the use of reference counted objects
295+
private Value getRetainedValueMatchingKey(Key key, Value value) {
296+
if (value == null) {
297+
// the wrapper has been recycled and contains another key
298+
return null;
299+
}
300+
try {
301+
value.retain();
302+
} catch (IllegalReferenceCountException e) {
303+
// Value was already deallocated
304+
return null;
305+
}
306+
// check that the value matches the key and that there's at least 2 references to it since
307+
// the cache should be holding one reference and a new reference was just added in this method
308+
if (value.refCnt() > 1 && value.matchesKey(key)) {
309+
return value;
310+
} else {
311+
// Value or IdentityWrapper was recycled and already contains another value
312+
// release the reference added in this method
313+
value.release();
314+
return null;
267315
}
268316
}
269317

@@ -280,7 +328,7 @@ public Collection<Value> getRange(Key first, Key last) {
280328

281329
// Return the values of the entries found in cache
282330
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : entries.subMap(first, true, last, true).entrySet()) {
283-
Value value = getValue(entry.getKey(), entry.getValue(), true);
331+
Value value = getValueMatchingEntry(entry);
284332
if (value != null) {
285333
values.add(value);
286334
}
@@ -297,6 +345,9 @@ public Collection<Value> getRange(Key first, Key last) {
297345
* @return an pair of ints, containing the number of removed entries and the total size
298346
*/
299347
public Pair<Integer, Long> removeRange(Key first, Key last, boolean lastInclusive) {
348+
if (log.isDebugEnabled()) {
349+
log.debug("Removing entries in range [{}, {}], lastInclusive: {}", first, last, lastInclusive);
350+
}
300351
RemovalCounters counters = RemovalCounters.create();
301352
Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first, true, last, lastInclusive);
302353
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : subMap.entrySet()) {
@@ -320,7 +371,7 @@ private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key, Value>> e
320371
boolean skipInvalid, Predicate<Value> removeCondition) {
321372
Key key = entry.getKey();
322373
EntryWrapper<Key, Value> entryWrapper = entry.getValue();
323-
Value value = entryWrapper.getValue(key, true);
374+
Value value = getValueMatchingEntry(entry);
324375
if (value == null) {
325376
// the wrapper has already been recycled and contains another key
326377
if (!skipInvalid) {
@@ -404,6 +455,9 @@ private Pair<Integer, Long> handleRemovalResult(RemovalCounters counters) {
404455
* @return a pair containing the number of entries evicted and their total size
405456
*/
406457
public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
458+
if (log.isDebugEnabled()) {
459+
log.debug("Evicting entries to reach a minimum size of {}", minSize);
460+
}
407461
checkArgument(minSize > 0);
408462
RemovalCounters counters = RemovalCounters.create();
409463
while (counters.removedSize < minSize && !Thread.currentThread().isInterrupted()) {
@@ -422,6 +476,9 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
422476
* @return the tota
423477
*/
424478
public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
479+
if (log.isDebugEnabled()) {
480+
log.debug("Evicting entries with timestamp <= {}", maxTimestamp);
481+
}
425482
RemovalCounters counters = RemovalCounters.create();
426483
while (!Thread.currentThread().isInterrupted()) {
427484
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();
@@ -453,6 +510,9 @@ public long getSize() {
453510
* @return size of removed entries
454511
*/
455512
public Pair<Integer, Long> clear() {
513+
if (log.isDebugEnabled()) {
514+
log.debug("Clearing the cache with {} entries and size {}", entries.size(), size.get());
515+
}
456516
RemovalCounters counters = RemovalCounters.create();
457517
while (!Thread.currentThread().isInterrupted()) {
458518
Map.Entry<Key, EntryWrapper<Key, Value>> entry = entries.firstEntry();

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,28 @@
2424
import static org.testng.Assert.assertNotNull;
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
27-
2827
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import io.netty.buffer.ByteBuf;
2929
import java.nio.charset.StandardCharsets;
3030
import java.util.ArrayList;
3131
import java.util.List;
3232
import java.util.UUID;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.CopyOnWriteArrayList;
3335
import java.util.concurrent.CountDownLatch;
3436
import java.util.concurrent.CyclicBarrier;
3537
import java.util.concurrent.Future;
38+
import java.util.concurrent.ThreadLocalRandom;
3639
import java.util.concurrent.TimeUnit;
3740
import java.util.concurrent.atomic.AtomicBoolean;
3841
import java.util.concurrent.atomic.AtomicReference;
39-
42+
import lombok.Cleanup;
4043
import lombok.extern.slf4j.Slf4j;
4144
import org.apache.bookkeeper.client.BookKeeper;
4245
import org.apache.bookkeeper.client.BookKeeperTestClient;
4346
import org.apache.bookkeeper.client.LedgerEntry;
4447
import org.apache.bookkeeper.client.api.DigestType;
48+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
4549
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
4650
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
4751
import org.apache.bookkeeper.mledger.Entry;
@@ -53,18 +57,17 @@
5357
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
5458
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
5559
import org.apache.bookkeeper.mledger.Position;
60+
import org.apache.bookkeeper.mledger.PositionFactory;
5661
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
5762
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
5863
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
5964
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
6065
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
66+
import org.apache.pulsar.common.util.FutureUtil;
6167
import org.awaitility.Awaitility;
6268
import org.testng.annotations.DataProvider;
6369
import org.testng.annotations.Test;
6470

65-
import io.netty.buffer.ByteBuf;
66-
import lombok.Cleanup;
67-
6871
@Slf4j
6972
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
7073

@@ -241,6 +244,108 @@ public void verifyConcurrentUsage() throws Exception {
241244
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
242245
}
243246

247+
@Test
248+
public void verifyAsyncReadEntryUsingCache() throws Exception {
249+
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
250+
251+
config.setMaxCacheSize(100 * 1024 * 1024);
252+
config.setCacheEvictionTimeThresholdMillis(10000);
253+
config.setCacheEvictionIntervalMs(10000);
254+
255+
@Cleanup("shutdown")
256+
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);
257+
258+
ManagedLedgerConfig conf = new ManagedLedgerConfig();
259+
conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2)
260+
.setRetentionSizeInMB(-1).setRetentionTime(-1, TimeUnit.MILLISECONDS);
261+
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my-ledger" + testName, conf);
262+
263+
int NumProducers = 5;
264+
int NumConsumers = 10;
265+
266+
final AtomicBoolean done = new AtomicBoolean();
267+
final CyclicBarrier barrier = new CyclicBarrier(NumProducers + NumConsumers + 1);
268+
269+
List<Future<?>> futures = new ArrayList();
270+
List<Position> positions = new CopyOnWriteArrayList<>();
271+
272+
for (int i = 0; i < NumProducers; i++) {
273+
futures.add(executor.submit(() -> {
274+
try {
275+
// wait for all threads to be ready to start at once
276+
barrier.await();
277+
while (!done.get()) {
278+
Position position = ledger.addEntry("entry".getBytes());
279+
positions.add(position);
280+
Thread.sleep(1);
281+
}
282+
} catch (Exception e) {
283+
e.printStackTrace();
284+
throw FutureUtil.wrapToCompletionException(e);
285+
}
286+
}));
287+
}
288+
289+
// create a dummy cursor since caching happens only when there are active consumers
290+
ManagedCursor cursor = ledger.openCursor("dummy");
291+
292+
for (int i = 0; i < NumConsumers; i++) {
293+
futures.add(executor.submit(() -> {
294+
try {
295+
// wait for all threads to be ready to start at once
296+
barrier.await();
297+
while (!done.get()) {
298+
if (positions.isEmpty()) {
299+
Thread.sleep(1);
300+
continue;
301+
}
302+
// Simulate a replay queue read pattern where individual entries are read
303+
Position randomPosition = positions.get(ThreadLocalRandom.current().nextInt(positions.size()));
304+
// Clone the original instance so that another instance is used in the asyncReadEntry call
305+
// This is to test that keys are compared by .equals and not by reference under the covers
306+
randomPosition = PositionFactory.create(randomPosition);
307+
CompletableFuture<Void> future = new CompletableFuture<>();
308+
ledger.asyncReadEntry(randomPosition, new AsyncCallbacks.ReadEntryCallback() {
309+
@Override
310+
public void readEntryComplete(Entry entry, Object ctx) {
311+
entry.release();
312+
future.complete(null);
313+
}
314+
315+
@Override
316+
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
317+
future.completeExceptionally(exception);
318+
}
319+
}, null);
320+
future.get();
321+
Thread.sleep(2);
322+
}
323+
} catch (Exception e) {
324+
e.printStackTrace();
325+
throw FutureUtil.wrapToCompletionException(e);
326+
}
327+
}));
328+
}
329+
330+
// trigger all worker threads at once to continue from the barrier
331+
barrier.await();
332+
333+
int testDurationSeconds = 3;
334+
Thread.sleep(testDurationSeconds * 1000);
335+
336+
done.set(true);
337+
for (Future<?> future : futures) {
338+
future.get();
339+
}
340+
341+
factory.getMbean().refreshStats(testDurationSeconds, TimeUnit.SECONDS);
342+
343+
assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
344+
assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
345+
assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
346+
assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
347+
}
348+
244349
@Test
245350
public void testSimple() throws Exception {
246351
@Cleanup("shutdown")

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,10 @@ public void testEnableTopicDelayedDelivery() throws Exception {
483483
break;
484484
}
485485
}
486-
producer.newMessage().value("long-tick-msg").deliverAfter(2, TimeUnit.SECONDS).send();
486+
producer.newMessage().value("long-tick-msg").deliverAfter(3, TimeUnit.SECONDS).send();
487487
msg = consumer.receive(1, TimeUnit.SECONDS);
488488
assertNull(msg);
489-
msg = consumer.receive(3, TimeUnit.SECONDS);
489+
msg = consumer.receive(4, TimeUnit.SECONDS);
490490
assertNotNull(msg);
491491
}
492492

0 commit comments

Comments
 (0)