Skip to content

Commit 0ae3f9d

Browse files
authored
[fix][broker] Fix the retry mechanism in MetadataCache#readModifyUpdateOrCreate (#23686)
## Motivation The method `MetadataCache#readModifyUpdateOrCreate` should handle the BadVersionException by retrying the modification process, as already noted in the Java documentation: "The modify function can potentially be called multiple times due to concurrent updates." Currently, `MetadataCache#readModifyUpdateOrCreate` does not catch the BadVersionException on the second attempt, allowing the exception to be passed to the caller. This issue can be easily reproduced by increasing concurrent futures in the test `MetadataCacheTest#readModifyUpdateBadVersionRetry`. The current retry implementation is incorrect and lacks a backoff mechanism, which could lead to too many requests to the metadata store. ## Modification - Correct the retry process in `MetadataCache#readModifyUpdateOrCreate` to ensure BadVersionException is caught during each retry. - Implement a retry backoff mechanism in `MetadataCache#readModifyUpdateOrCreate` to manage the frequency of retries effectively. - Add new config `retryBackoff` to the MetadataCacheConfig to control the MetadataCache retry backoff. - Respective the `metadataStoreOperationTimeoutSeconds` for the MetadataCache retry
1 parent 0845c21 commit 0ae3f9d

File tree

7 files changed

+135
-39
lines changed

7 files changed

+135
-39
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import lombok.Getter;
3535
import lombok.extern.slf4j.Slf4j;
3636
import org.apache.pulsar.metadata.api.MetadataCache;
37+
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
3738
import org.apache.pulsar.metadata.api.MetadataStore;
3839
import org.apache.pulsar.metadata.api.MetadataStoreException;
3940

@@ -58,13 +59,19 @@ public class BaseResources<T> {
5859

5960
public BaseResources(MetadataStore store, Class<T> clazz, int operationTimeoutSec) {
6061
this.store = store;
61-
this.cache = store.getMetadataCache(clazz);
62+
this.cache = store.getMetadataCache(clazz, MetadataCacheConfig.builder()
63+
.retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec,
64+
TimeUnit.SECONDS))
65+
.build());
6266
this.operationTimeoutSec = operationTimeoutSec;
6367
}
6468

6569
public BaseResources(MetadataStore store, TypeReference<T> typeRef, int operationTimeoutSec) {
6670
this.store = store;
67-
this.cache = store.getMetadataCache(typeRef);
71+
this.cache = store.getMetadataCache(typeRef, MetadataCacheConfig.builder()
72+
.retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec,
73+
TimeUnit.SECONDS))
74+
.build());
6875
this.operationTimeoutSec = operationTimeoutSec;
6976
}
7077

pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.common.util;
2020

21-
import com.google.common.annotations.VisibleForTesting;
2221
import java.time.Clock;
2322
import java.util.Random;
2423
import java.util.concurrent.TimeUnit;
@@ -95,11 +94,6 @@ public void reset() {
9594
this.mandatoryStopMade = false;
9695
}
9796

98-
@VisibleForTesting
99-
long getFirstBackoffTimeInMillis() {
100-
return firstBackoffTimeInMillis;
101-
}
102-
10397
public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,
10498
long defaultInterval, long maxBackoffInterval) {
10599
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);

pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public void mandatoryStopTest() {
136136

137137
// would have been 1600 w/o the mandatory stop
138138
assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
139+
assertTrue(backoff.isMandatoryStopMade());
139140
Mockito.when(mockClock.millis()).thenReturn(1900L);
140141
assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200));
141142
Mockito.when(mockClock.millis()).thenReturn(3200L);

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import lombok.Builder;
2525
import lombok.Getter;
2626
import lombok.ToString;
27+
import org.apache.pulsar.common.util.BackoffBuilder;
2728

2829
/**
2930
* The configuration builder for a {@link MetadataCache} config.
@@ -33,6 +34,10 @@
3334
@ToString
3435
public class MetadataCacheConfig<T> {
3536
private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
37+
public static final BackoffBuilder DEFAULT_RETRY_BACKOFF_BUILDER =
38+
new BackoffBuilder().setInitialTime(5, TimeUnit.MILLISECONDS)
39+
.setMax(3, TimeUnit.SECONDS)
40+
.setMandatoryStop(30, TimeUnit.SECONDS);
3641

3742
/**
3843
* Specifies that active entries are eligible for automatic refresh once a fixed duration has
@@ -57,4 +62,7 @@ public class MetadataCacheConfig<T> {
5762
@Builder.Default
5863
private final BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer = null;
5964

65+
@Builder.Default
66+
private final BackoffBuilder retryBackoff = DEFAULT_RETRY_BACKOFF_BUILDER;
67+
6068
}

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@
3030
import java.util.Optional;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.Executor;
33+
import java.util.concurrent.ScheduledExecutorService;
3334
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3436
import java.util.function.Consumer;
3537
import java.util.function.Function;
3638
import java.util.function.Supplier;
3739
import lombok.Getter;
3840
import lombok.extern.slf4j.Slf4j;
3941
import org.apache.bookkeeper.common.concurrent.FutureUtils;
42+
import org.apache.pulsar.common.util.Backoff;
4043
import org.apache.pulsar.metadata.api.CacheGetResult;
4144
import org.apache.pulsar.metadata.api.GetResult;
4245
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -58,25 +61,32 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica
5861
private final MetadataStore store;
5962
private final MetadataStoreExtended storeExtended;
6063
private final MetadataSerde<T> serde;
64+
private final ScheduledExecutorService executor;
65+
private final MetadataCacheConfig<T> cacheConfig;
6166

6267
private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache;
6368

64-
public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef, MetadataCacheConfig<T> cacheConfig) {
65-
this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig);
69+
public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef, MetadataCacheConfig<T> cacheConfig,
70+
ScheduledExecutorService executor) {
71+
this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor);
6672
}
6773

68-
public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig<T> cacheConfig) {
69-
this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig);
74+
public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig<T> cacheConfig,
75+
ScheduledExecutorService executor) {
76+
this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor);
7077
}
7178

72-
public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, MetadataCacheConfig<T> cacheConfig) {
79+
public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, MetadataCacheConfig<T> cacheConfig,
80+
ScheduledExecutorService executor) {
7381
this.store = store;
7482
if (store instanceof MetadataStoreExtended) {
7583
this.storeExtended = (MetadataStoreExtended) store;
7684
} else {
7785
this.storeExtended = null;
7886
}
7987
this.serde = serde;
88+
this.cacheConfig = cacheConfig;
89+
this.executor = executor;
8090

8191
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
8292
if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
@@ -321,22 +331,34 @@ public void accept(Notification t) {
321331
}
322332
}
323333

324-
private CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
325-
CompletableFuture<T> result = new CompletableFuture<>();
334+
private void execute(Supplier<CompletableFuture<T>> op, String key, CompletableFuture<T> result, Backoff backoff) {
326335
op.get().thenAccept(result::complete).exceptionally((ex) -> {
327336
if (ex.getCause() instanceof BadVersionException) {
328337
// if resource is updated by other than metadata-cache then metadata-cache will get bad-version
329338
// exception. so, try to invalidate the cache and try one more time.
330339
objCache.synchronous().invalidate(key);
331-
op.get().thenAccept(result::complete).exceptionally((ex1) -> {
332-
result.completeExceptionally(ex1.getCause());
340+
long elapsed = System.currentTimeMillis() - backoff.getFirstBackoffTimeInMillis();
341+
if (backoff.isMandatoryStopMade()) {
342+
result.completeExceptionally(new TimeoutException(
343+
String.format("Timeout to update key %s. Elapsed time: %d ms", key, elapsed)));
333344
return null;
334-
});
345+
}
346+
final var next = backoff.next();
347+
log.info("Update key {} conflicts. Retrying in {} ms. Mandatory stop: {}. Elapsed time: {} ms", key,
348+
next, backoff.isMandatoryStopMade(), elapsed);
349+
executor.schedule(() -> execute(op, key, result, backoff), next,
350+
TimeUnit.MILLISECONDS);
335351
return null;
336352
}
337353
result.completeExceptionally(ex.getCause());
338354
return null;
339355
});
356+
}
357+
358+
private CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
359+
final var backoff = cacheConfig.getRetryBackoff().create();
360+
CompletableFuture<T> result = new CompletableFuture<>();
361+
execute(op, key, result, backoff);
340362
return result;
341363
}
342364
}

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,21 +236,21 @@ protected boolean shouldIgnoreEvent(MetadataEvent event, GetResult existingValue
236236
@Override
237237
public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
238238
MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
239-
TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig);
239+
TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig, this.executor);
240240
metadataCaches.add(metadataCache);
241241
return metadataCache;
242242
}
243243

244244
@Override
245245
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
246-
MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, typeRef, cacheConfig);
246+
MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, typeRef, cacheConfig, this.executor);
247247
metadataCaches.add(metadataCache);
248248
return metadataCache;
249249
}
250250

251251
@Override
252252
public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
253-
MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, serde, cacheConfig);
253+
MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, serde, cacheConfig, this.executor);
254254
metadataCaches.add(metadataCache);
255255
return metadataCache;
256256
}

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.pulsar.metadata;
2020

21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.spy;
2124
import static org.testng.Assert.assertEquals;
2225
import static org.testng.Assert.assertNotEquals;
2326
import static org.testng.Assert.assertNotSame;
@@ -26,7 +29,9 @@
2629
import static org.testng.Assert.fail;
2730
import com.fasterxml.jackson.annotation.JsonIgnore;
2831
import com.fasterxml.jackson.core.type.TypeReference;
32+
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
2933
import java.io.IOException;
34+
import java.lang.reflect.Field;
3035
import java.nio.charset.StandardCharsets;
3136
import java.util.ArrayList;
3237
import java.util.EnumSet;
@@ -36,6 +41,8 @@
3641
import java.util.TreeMap;
3742
import java.util.concurrent.CompletableFuture;
3843
import java.util.concurrent.CompletionException;
44+
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.TimeoutException;
3946
import java.util.concurrent.atomic.AtomicReference;
4047
import java.util.function.Supplier;
4148
import lombok.AllArgsConstructor;
@@ -44,13 +51,16 @@
4451
import lombok.NoArgsConstructor;
4552
import lombok.extern.slf4j.Slf4j;
4653
import org.apache.pulsar.common.policies.data.Policies;
54+
import org.apache.pulsar.common.util.BackoffBuilder;
55+
import org.apache.pulsar.common.util.FutureUtil;
4756
import org.apache.pulsar.common.util.ObjectMapperFactory;
4857
import org.apache.pulsar.metadata.api.CacheGetResult;
4958
import org.apache.pulsar.metadata.api.MetadataCache;
5059
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
5160
import org.apache.pulsar.metadata.api.MetadataSerde;
5261
import org.apache.pulsar.metadata.api.MetadataStore;
5362
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
63+
import org.apache.pulsar.metadata.api.MetadataStoreException;
5464
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
5565
import org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
5666
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
@@ -60,6 +70,7 @@
6070
import org.apache.pulsar.metadata.api.extended.CreateOption;
6171
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
6272
import org.awaitility.Awaitility;
73+
import org.mockito.stubbing.Answer;
6374
import org.testng.annotations.DataProvider;
6475
import org.testng.annotations.Test;
6576

@@ -488,32 +499,74 @@ public void readModifyUpdate(String provider, Supplier<String> urlSupplier) thro
488499
public void readModifyUpdateBadVersionRetry() throws Exception {
489500
String url = zks.getConnectionString();
490501
@Cleanup
491-
MetadataStore sourceStore1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
492-
@Cleanup
493-
MetadataStore sourceStore2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
502+
MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
494503

495-
MetadataCache<MyClass> objCache1 = sourceStore1.getMetadataCache(MyClass.class);
496-
MetadataCache<MyClass> objCache2 = sourceStore2.getMetadataCache(MyClass.class);
504+
MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class);
497505

498506
String key1 = newKey();
499507

500508
MyClass value1 = new MyClass("a", 1);
501-
objCache1.create(key1, value1).join();
502-
assertEquals(objCache1.get(key1).join().get().b, 1);
509+
cache.create(key1, value1).join();
510+
assertEquals(cache.get(key1).join().get().b, 1);
503511

504-
CompletableFuture<MyClass> future1 = objCache1.readModifyUpdate(key1, v -> {
505-
return new MyClass(v.a, v.b + 1);
506-
});
507-
508-
CompletableFuture<MyClass> future2 = objCache2.readModifyUpdate(key1, v -> {
509-
return new MyClass(v.a, v.b + 1);
510-
});
512+
final var futures = new ArrayList<CompletableFuture<MyClass>>();
513+
final var sourceStores = new ArrayList<MetadataStore>();
511514

512-
MyClass myClass1 = future1.join();
513-
assertEquals(myClass1.b, 2);
515+
for (int i = 0; i < 20; i++) {
516+
final var sourceStore = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
517+
sourceStores.add(sourceStore);
518+
final var objCache = sourceStore.getMetadataCache(MyClass.class);
519+
futures.add(objCache.readModifyUpdate(key1, v -> new MyClass(v.a, v.b + 1)));
520+
}
521+
FutureUtil.waitForAll(futures).join();
522+
for (var sourceStore : sourceStores) {
523+
sourceStore.close();
524+
}
525+
}
514526

515-
MyClass myClass2 = future2.join();
516-
assertEquals(myClass2.b, 3);
527+
@Test
528+
public void readModifyUpdateOrCreateRetryTimeout() throws Exception {
529+
String url = zks.getConnectionString();
530+
@Cleanup
531+
MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
532+
533+
MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class, MetadataCacheConfig.builder()
534+
.retryBackoff(new BackoffBuilder()
535+
.setInitialTime(5, TimeUnit.MILLISECONDS)
536+
.setMax(1, TimeUnit.SECONDS)
537+
.setMandatoryStop(3, TimeUnit.SECONDS)).build());
538+
539+
Field metadataCacheField = cache.getClass().getDeclaredField("objCache");
540+
metadataCacheField.setAccessible(true);
541+
var objCache = metadataCacheField.get(cache);
542+
var spyObjCache = (AsyncLoadingCache<?, ?>) spy(objCache);
543+
doAnswer((Answer<CompletableFuture<MyClass>>) invocation -> CompletableFuture.failedFuture(
544+
new MetadataStoreException.BadVersionException(""))).when(spyObjCache).get(any());
545+
metadataCacheField.set(cache, spyObjCache);
546+
547+
// Test three times to ensure that the retry works each time.
548+
for (int i = 0; i < 3; i++) {
549+
var start = System.currentTimeMillis();
550+
boolean timeouted = false;
551+
try {
552+
cache.readModifyUpdateOrCreate(newKey(), Optional::get).join();
553+
} catch (CompletionException e) {
554+
if (e.getCause() instanceof TimeoutException) {
555+
var elapsed = System.currentTimeMillis() - start;
556+
// Since we reduce the wait time by a random amount for each retry, the total elapsed time should be
557+
// mandatoryStopTime - maxTime * 0.9, which is 2900ms.
558+
assertTrue(elapsed >= 2900L,
559+
"The elapsed time should be greater than the timeout. But now it's " + elapsed);
560+
// The elapsed time should be less than the timeout. The 1.5 factor allows for some extra time.
561+
assertTrue(elapsed < 3000L * 1.5,
562+
"The retry should have been stopped after the timeout. But now it's " + elapsed);
563+
timeouted = true;
564+
} else {
565+
fail("Should have failed with TimeoutException, but failed with " + e.getCause());
566+
}
567+
}
568+
assertTrue(timeouted, "Should have failed with TimeoutException, but succeeded");
569+
}
517570
}
518571

519572
@Test(dataProvider = "impl")
@@ -647,4 +700,15 @@ public void testAsyncReloadConsumer(String provider, Supplier<String> urlSupplie
647700
refreshed.contains(value2);
648701
});
649702
}
703+
704+
@Test
705+
public void testDefaultMetadataCacheConfig() {
706+
final var config = MetadataCacheConfig.builder().build();
707+
assertEquals(config.getRefreshAfterWriteMillis(), TimeUnit.MINUTES.toMillis(5));
708+
assertEquals(config.getExpireAfterWriteMillis(), TimeUnit.MINUTES.toMillis(10));
709+
final var backoff = config.getRetryBackoff().create();
710+
assertEquals(backoff.getInitial(), 5);
711+
assertEquals(backoff.getMax(), 3000);
712+
assertEquals(backoff.getMandatoryStop(), 30_000);
713+
}
650714
}

0 commit comments

Comments
 (0)