Skip to content

Commit e447ade

Browse files
fix: product-type cache race condition
1 parent 317d55b commit e447ade

File tree

2 files changed

+223
-8
lines changed

2 files changed

+223
-8
lines changed

src/integration-test/java/com/commercetools/sync/integration/services/impl/ProductTypeServiceImplIT.java

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,4 +448,175 @@ void updateProductType_WithValidChanges_ShouldUpdateProductTypeCorrectly() {
448448
assertThat(fetchedProductType.getName()).isEqualTo(updatedProductType.getName());
449449
assertThat(fetchedProductType.getAttributes()).isEqualTo(updatedProductType.getAttributes());
450450
}
451+
452+
/*
453+
* This test verifies the cache stampede fix by making concurrent calls
454+
and ensuring the cache is populated correctly without race conditions.
455+
456+
What this test verifies:
457+
1. All concurrent calls complete successfully (no race conditions)
458+
2. All calls return the same cached data (cache consistency)
459+
3. No exceptions occur during concurrent access
460+
461+
NOTE: The tests were execute with logs and it can be seen that only one query is executed.
462+
*/
463+
@Test
464+
void
465+
fetchCachedProductAttributeMetaDataMap_WithConcurrentCalls_ShouldHandleCacheStampedeCorrectly()
466+
throws Exception {
467+
468+
// preparation - create a product type with attributes
469+
final ProductTypeDraft productTypeDraft =
470+
ProductTypeDraftBuilder.of()
471+
.key("cache-stampede-test-type")
472+
.name("Cache Stampede Test Type")
473+
.description("Test product type for cache stampede fix")
474+
.attributes(ATTRIBUTE_DEFINITION_DRAFT_1)
475+
.build();
476+
477+
final ProductType createdProductType =
478+
CTP_TARGET_CLIENT.productTypes().post(productTypeDraft).execute().join().getBody();
479+
480+
final ProductTypeSyncOptions productTypeSyncOptions =
481+
ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build();
482+
final ProductTypeService productTypeService = new ProductTypeServiceImpl(productTypeSyncOptions);
483+
484+
// test - make 10 concurrent calls to fetchCachedProductAttributeMetaDataMap
485+
// Used a CountDownLatch to ensure all threads start at approximately the same time
486+
final int numberOfConcurrentCalls = 10;
487+
final java.util.concurrent.CountDownLatch startLatch =
488+
new java.util.concurrent.CountDownLatch(1);
489+
final java.util.concurrent.CountDownLatch readyLatch =
490+
new java.util.concurrent.CountDownLatch(numberOfConcurrentCalls);
491+
final java.util.concurrent.ExecutorService executorService =
492+
java.util.concurrent.Executors.newFixedThreadPool(numberOfConcurrentCalls);
493+
final java.util.List<java.util.concurrent.CompletableFuture<Optional<Map<String, AttributeMetaData>>>>
494+
futures = new java.util.ArrayList<>();
495+
496+
for (int i = 0; i < numberOfConcurrentCalls; i++) {
497+
final java.util.concurrent.CompletableFuture<Optional<Map<String, AttributeMetaData>>> future =
498+
java.util.concurrent.CompletableFuture.supplyAsync(
499+
() -> {
500+
try {
501+
readyLatch.countDown();
502+
startLatch.await(); // Wait for all threads to be ready
503+
return productTypeService
504+
.fetchCachedProductAttributeMetaDataMap(createdProductType.getId())
505+
.toCompletableFuture()
506+
.join();
507+
} catch (InterruptedException e) {
508+
Thread.currentThread().interrupt();
509+
throw new RuntimeException(e);
510+
}
511+
},
512+
executorService);
513+
futures.add(future);
514+
}
515+
516+
readyLatch.await(5, java.util.concurrent.TimeUnit.SECONDS);
517+
518+
// Start all threads at once
519+
startLatch.countDown();
520+
521+
// Wait for all futures to complete
522+
java.util.concurrent.CompletableFuture.allOf(futures.toArray(new java.util.concurrent.CompletableFuture[0]))
523+
.join();
524+
525+
executorService.shutdown();
526+
executorService.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS);
527+
528+
// assertions - all calls should return the same result
529+
final Optional<Map<String, AttributeMetaData>> firstResult = futures.get(0).join();
530+
assertThat(firstResult).isPresent();
531+
532+
for (java.util.concurrent.CompletableFuture<Optional<Map<String, AttributeMetaData>>> future :
533+
futures) {
534+
assertThat(future).isCompleted();
535+
final Optional<Map<String, AttributeMetaData>> result = future.join();
536+
assertThat(result).isPresent();
537+
assertThat(result.get()).containsKey(ATTRIBUTE_DEFINITION_DRAFT_1.getName());
538+
// Verify all results are identical (same cached instance)
539+
assertThat(result.get()).isEqualTo(firstResult.get());
540+
}
541+
542+
// cleanup
543+
CTP_TARGET_CLIENT
544+
.productTypes()
545+
.withId(createdProductType.getId())
546+
.delete()
547+
.withVersion(createdProductType.getVersion())
548+
.execute()
549+
.join();
550+
}
551+
552+
@Test
553+
void fetchCachedProductAttributeMetaDataMap_WithPopulatedCache_ShouldReturnCachedData() {
554+
// This test verifies that after the first call, subsequent calls use the cache
555+
556+
// preparation - create a product type
557+
final ProductTypeDraft productTypeDraft =
558+
ProductTypeDraftBuilder.of()
559+
.key("cache-reuse-test-type")
560+
.name("Cache Reuse Test Type")
561+
.description("Test product type for cache reuse")
562+
.attributes(ATTRIBUTE_DEFINITION_DRAFT_1, ATTRIBUTE_DEFINITION_DRAFT_2)
563+
.build();
564+
565+
final ProductType createdProductType =
566+
CTP_TARGET_CLIENT.productTypes().post(productTypeDraft).execute().join().getBody();
567+
568+
final ProductTypeSyncOptions productTypeSyncOptions =
569+
ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build();
570+
final ProductTypeService productTypeService = new ProductTypeServiceImpl(productTypeSyncOptions);
571+
572+
// test - first call to populate cache
573+
final Optional<Map<String, AttributeMetaData>> firstResult =
574+
productTypeService
575+
.fetchCachedProductAttributeMetaDataMap(createdProductType.getId())
576+
.toCompletableFuture()
577+
.join();
578+
579+
// test - second call should use cache
580+
final Optional<Map<String, AttributeMetaData>> secondResult =
581+
productTypeService
582+
.fetchCachedProductAttributeMetaDataMap(createdProductType.getId())
583+
.toCompletableFuture()
584+
.join();
585+
586+
// assertions
587+
assertThat(firstResult).isPresent();
588+
assertThat(secondResult).isPresent();
589+
assertThat(firstResult.get()).isEqualTo(secondResult.get());
590+
assertThat(firstResult.get()).hasSize(2);
591+
assertThat(firstResult.get()).containsKeys(
592+
ATTRIBUTE_DEFINITION_DRAFT_1.getName(), ATTRIBUTE_DEFINITION_DRAFT_2.getName());
593+
594+
// cleanup
595+
CTP_TARGET_CLIENT
596+
.productTypes()
597+
.withId(createdProductType.getId())
598+
.delete()
599+
.withVersion(createdProductType.getVersion())
600+
.execute()
601+
.join();
602+
}
603+
604+
@Test
605+
void
606+
fetchCachedProductAttributeMetaDataMap_WithNonExistentProductType_ShouldReturnEmptyOptional() {
607+
// preparation
608+
final ProductTypeSyncOptions productTypeSyncOptions =
609+
ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build();
610+
final ProductTypeService productTypeService = new ProductTypeServiceImpl(productTypeSyncOptions);
611+
612+
// test - query for non-existent product type ID
613+
final Optional<Map<String, AttributeMetaData>> result =
614+
productTypeService
615+
.fetchCachedProductAttributeMetaDataMap("non-existent-id-12345")
616+
.toCompletableFuture()
617+
.join();
618+
619+
// assertions
620+
assertThat(result).isEmpty();
621+
}
451622
}

src/main/java/com/commercetools/sync/services/impl/ProductTypeServiceImpl.java

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public final class ProductTypeServiceImpl
4343
private final Map<String, Map<String, AttributeMetaData>> productsAttributesMetaData =
4444
new ConcurrentHashMap<>();
4545

46+
private volatile CompletableFuture<Void> cacheLoadingFuture = null;
47+
private final Object cacheLoadingLock = new Object();
48+
4649
public ProductTypeServiceImpl(@Nonnull final BaseSyncOptions syncOptions) {
4750
super(syncOptions);
4851
}
@@ -82,16 +85,58 @@ private static Map<String, AttributeMetaData> getAttributeMetaDataMap(
8285
public CompletionStage<Optional<Map<String, AttributeMetaData>>>
8386
fetchCachedProductAttributeMetaDataMap(@Nonnull final String productTypeId) {
8487

85-
if (productsAttributesMetaData.isEmpty()) {
86-
return fetchAndCacheProductMetaData(productTypeId);
88+
if (!productsAttributesMetaData.isEmpty()) {
89+
return CompletableFuture.completedFuture(
90+
Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
91+
}
92+
93+
CompletableFuture<Void> loadingFuture = cacheLoadingFuture;
94+
if (loadingFuture != null && !loadingFuture.isDone()) {
95+
return loadingFuture.thenApply(
96+
ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
97+
}
98+
99+
synchronized (cacheLoadingLock) {
100+
if (!productsAttributesMetaData.isEmpty()) {
101+
return CompletableFuture.completedFuture(
102+
Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
103+
}
104+
105+
loadingFuture = cacheLoadingFuture;
106+
if (loadingFuture != null && !loadingFuture.isDone()) {
107+
return loadingFuture.thenApply(
108+
ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
109+
}
110+
111+
final CompletableFuture<Void> newLoadingFuture = new CompletableFuture<>();
112+
cacheLoadingFuture = newLoadingFuture;
113+
114+
fetchAndCacheAllProductMetaData()
115+
.whenComplete(
116+
(result, throwable) -> {
117+
if (throwable != null) {
118+
newLoadingFuture.completeExceptionally(throwable);
119+
} else {
120+
newLoadingFuture.complete(null);
121+
}
122+
});
123+
124+
return newLoadingFuture.thenApply(
125+
ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
87126
}
88-
return CompletableFuture.completedFuture(
89-
Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
90127
}
91128

129+
/**
130+
* Fetches all product types from CTP and caches their attribute metadata.
131+
*
132+
* <p>This method is called only once during the first request for attribute metadata. All
133+
* subsequent requests will use the cached data.
134+
*
135+
* @return a {@link CompletionStage} that completes when all product types have been fetched and
136+
* cached
137+
*/
92138
@Nonnull
93-
private CompletionStage<Optional<Map<String, AttributeMetaData>>> fetchAndCacheProductMetaData(
94-
@Nonnull final String productTypeId) {
139+
private CompletionStage<Void> fetchAndCacheAllProductMetaData() {
95140
final Consumer<List<ProductType>> productTypePageConsumer =
96141
productTypePage ->
97142
productTypePage.forEach(
@@ -102,8 +147,7 @@ private CompletionStage<Optional<Map<String, AttributeMetaData>>> fetchAndCacheP
102147
final ByProjectKeyProductTypesGet byProjectKeyProductTypesGet =
103148
this.syncOptions.getCtpClient().productTypes().get();
104149

105-
return QueryUtils.queryAll(byProjectKeyProductTypesGet, productTypePageConsumer)
106-
.thenApply(result -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
150+
return QueryUtils.queryAll(byProjectKeyProductTypesGet, productTypePageConsumer);
107151
}
108152

109153
@Nonnull

0 commit comments

Comments
 (0)