Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/checkout@v4
- name: Fetch Library version
id: vars
run: echo ::set-output name=libVersion::${GITHUB_REF#refs/*/}
run: echo libVersion=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT
- name: benchmark test
if: ${{ success() }}
run: ./gradlew clean setLibraryVersion benchmark
Expand All @@ -41,7 +41,7 @@ jobs:
uses: actions/checkout@v4
- name: Set output
id: vars
run: echo ::set-output name=tag::${GITHUB_REF#refs/*/}
run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT
- name: Set up Java
uses: actions/setup-java@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
uses: actions/checkout@v4
- name: Fetch Library version
id: vars
run: echo ::set-output name=libVersion::${GITHUB_REF#refs/*/}
run: echo libVersion=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT
- name: benchmark test
if: ${{ success() }}
run: ./gradlew clean setLibraryVersion benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,4 +448,185 @@ void updateProductType_WithValidChanges_ShouldUpdateProductTypeCorrectly() {
assertThat(fetchedProductType.getName()).isEqualTo(updatedProductType.getName());
assertThat(fetchedProductType.getAttributes()).isEqualTo(updatedProductType.getAttributes());
}

/*
* This test verifies the cache stampede fix by making concurrent calls
and ensuring the cache is populated correctly without race conditions.

What this test verifies:
1. All concurrent calls complete successfully (no race conditions)
2. All calls return the same cached data (cache consistency)
3. No exceptions occur during concurrent access

NOTE: The tests were execute with logs and it can be seen that only one query is executed.
*/
@Test
void
fetchCachedProductAttributeMetaDataMap_WithConcurrentCalls_ShouldHandleCacheStampedeCorrectly()
throws Exception {

// preparation - create a product type with attributes
final ProductTypeDraft productTypeDraft =
ProductTypeDraftBuilder.of()
.key("cache-stampede-test-type")
.name("Cache Stampede Test Type")
.description("Test product type for cache stampede fix")
.attributes(ATTRIBUTE_DEFINITION_DRAFT_1)
.build();

final ProductType createdProductType =
CTP_TARGET_CLIENT.productTypes().post(productTypeDraft).execute().join().getBody();

final ProductTypeSyncOptions productTypeSyncOptions =
ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build();
final ProductTypeService productTypeService =
new ProductTypeServiceImpl(productTypeSyncOptions);

// test - make 10 concurrent calls to fetchCachedProductAttributeMetaDataMap
// Used a CountDownLatch to ensure all threads start at approximately the same time
final int numberOfConcurrentCalls = 10;
final java.util.concurrent.CountDownLatch startLatch =
new java.util.concurrent.CountDownLatch(1);
final java.util.concurrent.CountDownLatch readyLatch =
new java.util.concurrent.CountDownLatch(numberOfConcurrentCalls);
final java.util.concurrent.ExecutorService executorService =
java.util.concurrent.Executors.newFixedThreadPool(numberOfConcurrentCalls);
final java.util.List<
java.util.concurrent.CompletableFuture<Optional<Map<String, AttributeMetaData>>>>
futures = new java.util.ArrayList<>();

for (int i = 0; i < numberOfConcurrentCalls; i++) {
final java.util.concurrent.CompletableFuture<Optional<Map<String, AttributeMetaData>>>
future =
java.util.concurrent.CompletableFuture.supplyAsync(
() -> {
try {
readyLatch.countDown();
startLatch.await(); // Wait for all threads to be ready
return productTypeService
.fetchCachedProductAttributeMetaDataMap(createdProductType.getId())
.toCompletableFuture()
.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
executorService);
futures.add(future);
}

final boolean allThreadsReady = readyLatch.await(5, java.util.concurrent.TimeUnit.SECONDS);
assertThat(allThreadsReady).as("All threads should be ready within timeout").isTrue();

// Start all threads at once
startLatch.countDown();

// Wait for all futures to complete
java.util.concurrent.CompletableFuture.allOf(
futures.toArray(new java.util.concurrent.CompletableFuture[0]))
.join();

executorService.shutdown();
final boolean executorTerminated =
executorService.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS);
assertThat(executorTerminated).as("Executor service should terminate within timeout").isTrue();

// assertions - all calls should return the same result
final Optional<Map<String, AttributeMetaData>> firstResult = futures.get(0).join();
assertThat(firstResult).isPresent();

for (java.util.concurrent.CompletableFuture<Optional<Map<String, AttributeMetaData>>> future :
futures) {
assertThat(future).isCompleted();
final Optional<Map<String, AttributeMetaData>> result = future.join();
assertThat(result).isPresent();
assertThat(result.get()).containsKey(ATTRIBUTE_DEFINITION_DRAFT_1.getName());
// Verify all results are identical (same cached instance)
assertThat(result.get()).isEqualTo(firstResult.get());
}

// cleanup
CTP_TARGET_CLIENT
.productTypes()
.withId(createdProductType.getId())
.delete()
.withVersion(createdProductType.getVersion())
.execute()
.join();
}

@Test
void fetchCachedProductAttributeMetaDataMap_WithPopulatedCache_ShouldReturnCachedData() {
// This test verifies that after the first call, subsequent calls use the cache

// preparation - create a product type
final ProductTypeDraft productTypeDraft =
ProductTypeDraftBuilder.of()
.key("cache-reuse-test-type")
.name("Cache Reuse Test Type")
.description("Test product type for cache reuse")
.attributes(ATTRIBUTE_DEFINITION_DRAFT_1, ATTRIBUTE_DEFINITION_DRAFT_2)
.build();

final ProductType createdProductType =
CTP_TARGET_CLIENT.productTypes().post(productTypeDraft).execute().join().getBody();

final ProductTypeSyncOptions productTypeSyncOptions =
ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build();
final ProductTypeService productTypeService =
new ProductTypeServiceImpl(productTypeSyncOptions);

// test - first call to populate cache
final Optional<Map<String, AttributeMetaData>> firstResult =
productTypeService
.fetchCachedProductAttributeMetaDataMap(createdProductType.getId())
.toCompletableFuture()
.join();

// test - second call should use cache
final Optional<Map<String, AttributeMetaData>> secondResult =
productTypeService
.fetchCachedProductAttributeMetaDataMap(createdProductType.getId())
.toCompletableFuture()
.join();

// assertions
assertThat(firstResult).isPresent();
assertThat(secondResult).isPresent();
assertThat(firstResult.get()).isEqualTo(secondResult.get());
assertThat(firstResult.get()).hasSize(2);
assertThat(firstResult.get())
.containsKeys(
ATTRIBUTE_DEFINITION_DRAFT_1.getName(), ATTRIBUTE_DEFINITION_DRAFT_2.getName());

// cleanup
CTP_TARGET_CLIENT
.productTypes()
.withId(createdProductType.getId())
.delete()
.withVersion(createdProductType.getVersion())
.execute()
.join();
}

@Test
void
fetchCachedProductAttributeMetaDataMap_WithNonExistentProductType_ShouldReturnEmptyOptional() {
// preparation
final ProductTypeSyncOptions productTypeSyncOptions =
ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build();
final ProductTypeService productTypeService =
new ProductTypeServiceImpl(productTypeSyncOptions);

// test - query for non-existent product type ID
final Optional<Map<String, AttributeMetaData>> result =
productTypeService
.fetchCachedProductAttributeMetaDataMap("non-existent-id-12345")
.toCompletableFuture()
.join();

// assertions
assertThat(result).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public final class ProductTypeServiceImpl
private final Map<String, Map<String, AttributeMetaData>> productsAttributesMetaData =
new ConcurrentHashMap<>();

private volatile CompletableFuture<Void> cacheLoadingFuture = null;
private final Object cacheLoadingLock = new Object();

public ProductTypeServiceImpl(@Nonnull final BaseSyncOptions syncOptions) {
super(syncOptions);
}
Expand Down Expand Up @@ -82,16 +85,58 @@ private static Map<String, AttributeMetaData> getAttributeMetaDataMap(
public CompletionStage<Optional<Map<String, AttributeMetaData>>>
fetchCachedProductAttributeMetaDataMap(@Nonnull final String productTypeId) {

if (productsAttributesMetaData.isEmpty()) {
return fetchAndCacheProductMetaData(productTypeId);
if (!productsAttributesMetaData.isEmpty()) {
return CompletableFuture.completedFuture(
Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
}

CompletableFuture<Void> loadingFuture = cacheLoadingFuture;
if (loadingFuture != null && !loadingFuture.isDone()) {
return loadingFuture.thenApply(
ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
}

synchronized (cacheLoadingLock) {
if (!productsAttributesMetaData.isEmpty()) {
return CompletableFuture.completedFuture(
Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
}

loadingFuture = cacheLoadingFuture;
if (loadingFuture != null && !loadingFuture.isDone()) {
return loadingFuture.thenApply(
ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
}

final CompletableFuture<Void> newLoadingFuture = new CompletableFuture<>();
cacheLoadingFuture = newLoadingFuture;

fetchAndCacheAllProductMetaData()
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
newLoadingFuture.completeExceptionally(throwable);
} else {
newLoadingFuture.complete(null);
}
});

return newLoadingFuture.thenApply(
ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
}
return CompletableFuture.completedFuture(
Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
}

/**
* Fetches all product types from CTP and caches their attribute metadata.
*
* <p>This method is called only once during the first request for attribute metadata. All
* subsequent requests will use the cached data.
*
* @return a {@link CompletionStage} that completes when all product types have been fetched and
* cached
*/
@Nonnull
private CompletionStage<Optional<Map<String, AttributeMetaData>>> fetchAndCacheProductMetaData(
@Nonnull final String productTypeId) {
private CompletionStage<Void> fetchAndCacheAllProductMetaData() {
final Consumer<List<ProductType>> productTypePageConsumer =
productTypePage ->
productTypePage.forEach(
Expand All @@ -102,8 +147,7 @@ private CompletionStage<Optional<Map<String, AttributeMetaData>>> fetchAndCacheP
final ByProjectKeyProductTypesGet byProjectKeyProductTypesGet =
this.syncOptions.getCtpClient().productTypes().get();

return QueryUtils.queryAll(byProjectKeyProductTypesGet, productTypePageConsumer)
.thenApply(result -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId)));
return QueryUtils.queryAll(byProjectKeyProductTypesGet, productTypePageConsumer);
}

@Nonnull
Expand Down