diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index d1deb7552c..83e6e62d99 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -24,7 +24,7 @@ jobs: uses: actions/checkout@v4 - name: Fetch Library version id: vars - run: echo ::set-output name=libVersion::${GITHUB_REF#refs/*/} + run: echo libVersion=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT - name: benchmark test if: ${{ success() }} run: ./gradlew clean setLibraryVersion benchmark @@ -41,7 +41,7 @@ jobs: uses: actions/checkout@v4 - name: Set output id: vars - run: echo ::set-output name=tag::${GITHUB_REF#refs/*/} + run: echo tag=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT - name: Set up Java uses: actions/setup-java@v4 with: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f243155e7d..c742d96023 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,7 +60,7 @@ jobs: uses: actions/checkout@v4 - name: Fetch Library version id: vars - run: echo ::set-output name=libVersion::${GITHUB_REF#refs/*/} + run: echo libVersion=${GITHUB_REF#refs/*/} >> $GITHUB_OUTPUT - name: benchmark test if: ${{ success() }} run: ./gradlew clean setLibraryVersion benchmark diff --git a/src/integration-test/java/com/commercetools/sync/integration/services/impl/ProductTypeServiceImplIT.java b/src/integration-test/java/com/commercetools/sync/integration/services/impl/ProductTypeServiceImplIT.java index cf522f4b3a..c763a0ffe1 100644 --- a/src/integration-test/java/com/commercetools/sync/integration/services/impl/ProductTypeServiceImplIT.java +++ b/src/integration-test/java/com/commercetools/sync/integration/services/impl/ProductTypeServiceImplIT.java @@ -448,4 +448,185 @@ void updateProductType_WithValidChanges_ShouldUpdateProductTypeCorrectly() { assertThat(fetchedProductType.getName()).isEqualTo(updatedProductType.getName()); assertThat(fetchedProductType.getAttributes()).isEqualTo(updatedProductType.getAttributes()); } + + /* + * This test verifies the cache stampede fix by making concurrent calls + and ensuring the cache is populated correctly without race conditions. + + What this test verifies: + 1. All concurrent calls complete successfully (no race conditions) + 2. All calls return the same cached data (cache consistency) + 3. No exceptions occur during concurrent access + + NOTE: The tests were execute with logs and it can be seen that only one query is executed. + */ + @Test + void + fetchCachedProductAttributeMetaDataMap_WithConcurrentCalls_ShouldHandleCacheStampedeCorrectly() + throws Exception { + + // preparation - create a product type with attributes + final ProductTypeDraft productTypeDraft = + ProductTypeDraftBuilder.of() + .key("cache-stampede-test-type") + .name("Cache Stampede Test Type") + .description("Test product type for cache stampede fix") + .attributes(ATTRIBUTE_DEFINITION_DRAFT_1) + .build(); + + final ProductType createdProductType = + CTP_TARGET_CLIENT.productTypes().post(productTypeDraft).execute().join().getBody(); + + final ProductTypeSyncOptions productTypeSyncOptions = + ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build(); + final ProductTypeService productTypeService = + new ProductTypeServiceImpl(productTypeSyncOptions); + + // test - make 10 concurrent calls to fetchCachedProductAttributeMetaDataMap + // Used a CountDownLatch to ensure all threads start at approximately the same time + final int numberOfConcurrentCalls = 10; + final java.util.concurrent.CountDownLatch startLatch = + new java.util.concurrent.CountDownLatch(1); + final java.util.concurrent.CountDownLatch readyLatch = + new java.util.concurrent.CountDownLatch(numberOfConcurrentCalls); + final java.util.concurrent.ExecutorService executorService = + java.util.concurrent.Executors.newFixedThreadPool(numberOfConcurrentCalls); + final java.util.List< + java.util.concurrent.CompletableFuture>>> + futures = new java.util.ArrayList<>(); + + for (int i = 0; i < numberOfConcurrentCalls; i++) { + final java.util.concurrent.CompletableFuture>> + future = + java.util.concurrent.CompletableFuture.supplyAsync( + () -> { + try { + readyLatch.countDown(); + startLatch.await(); // Wait for all threads to be ready + return productTypeService + .fetchCachedProductAttributeMetaDataMap(createdProductType.getId()) + .toCompletableFuture() + .join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }, + executorService); + futures.add(future); + } + + final boolean allThreadsReady = readyLatch.await(5, java.util.concurrent.TimeUnit.SECONDS); + assertThat(allThreadsReady).as("All threads should be ready within timeout").isTrue(); + + // Start all threads at once + startLatch.countDown(); + + // Wait for all futures to complete + java.util.concurrent.CompletableFuture.allOf( + futures.toArray(new java.util.concurrent.CompletableFuture[0])) + .join(); + + executorService.shutdown(); + final boolean executorTerminated = + executorService.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS); + assertThat(executorTerminated).as("Executor service should terminate within timeout").isTrue(); + + // assertions - all calls should return the same result + final Optional> firstResult = futures.get(0).join(); + assertThat(firstResult).isPresent(); + + for (java.util.concurrent.CompletableFuture>> future : + futures) { + assertThat(future).isCompleted(); + final Optional> result = future.join(); + assertThat(result).isPresent(); + assertThat(result.get()).containsKey(ATTRIBUTE_DEFINITION_DRAFT_1.getName()); + // Verify all results are identical (same cached instance) + assertThat(result.get()).isEqualTo(firstResult.get()); + } + + // cleanup + CTP_TARGET_CLIENT + .productTypes() + .withId(createdProductType.getId()) + .delete() + .withVersion(createdProductType.getVersion()) + .execute() + .join(); + } + + @Test + void fetchCachedProductAttributeMetaDataMap_WithPopulatedCache_ShouldReturnCachedData() { + // This test verifies that after the first call, subsequent calls use the cache + + // preparation - create a product type + final ProductTypeDraft productTypeDraft = + ProductTypeDraftBuilder.of() + .key("cache-reuse-test-type") + .name("Cache Reuse Test Type") + .description("Test product type for cache reuse") + .attributes(ATTRIBUTE_DEFINITION_DRAFT_1, ATTRIBUTE_DEFINITION_DRAFT_2) + .build(); + + final ProductType createdProductType = + CTP_TARGET_CLIENT.productTypes().post(productTypeDraft).execute().join().getBody(); + + final ProductTypeSyncOptions productTypeSyncOptions = + ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build(); + final ProductTypeService productTypeService = + new ProductTypeServiceImpl(productTypeSyncOptions); + + // test - first call to populate cache + final Optional> firstResult = + productTypeService + .fetchCachedProductAttributeMetaDataMap(createdProductType.getId()) + .toCompletableFuture() + .join(); + + // test - second call should use cache + final Optional> secondResult = + productTypeService + .fetchCachedProductAttributeMetaDataMap(createdProductType.getId()) + .toCompletableFuture() + .join(); + + // assertions + assertThat(firstResult).isPresent(); + assertThat(secondResult).isPresent(); + assertThat(firstResult.get()).isEqualTo(secondResult.get()); + assertThat(firstResult.get()).hasSize(2); + assertThat(firstResult.get()) + .containsKeys( + ATTRIBUTE_DEFINITION_DRAFT_1.getName(), ATTRIBUTE_DEFINITION_DRAFT_2.getName()); + + // cleanup + CTP_TARGET_CLIENT + .productTypes() + .withId(createdProductType.getId()) + .delete() + .withVersion(createdProductType.getVersion()) + .execute() + .join(); + } + + @Test + void + fetchCachedProductAttributeMetaDataMap_WithNonExistentProductType_ShouldReturnEmptyOptional() { + // preparation + final ProductTypeSyncOptions productTypeSyncOptions = + ProductTypeSyncOptionsBuilder.of(CTP_TARGET_CLIENT).build(); + final ProductTypeService productTypeService = + new ProductTypeServiceImpl(productTypeSyncOptions); + + // test - query for non-existent product type ID + final Optional> result = + productTypeService + .fetchCachedProductAttributeMetaDataMap("non-existent-id-12345") + .toCompletableFuture() + .join(); + + // assertions + assertThat(result).isEmpty(); + } } diff --git a/src/main/java/com/commercetools/sync/services/impl/ProductTypeServiceImpl.java b/src/main/java/com/commercetools/sync/services/impl/ProductTypeServiceImpl.java index 9db9d0297d..1755a6c880 100644 --- a/src/main/java/com/commercetools/sync/services/impl/ProductTypeServiceImpl.java +++ b/src/main/java/com/commercetools/sync/services/impl/ProductTypeServiceImpl.java @@ -43,6 +43,9 @@ public final class ProductTypeServiceImpl private final Map> productsAttributesMetaData = new ConcurrentHashMap<>(); + private volatile CompletableFuture cacheLoadingFuture = null; + private final Object cacheLoadingLock = new Object(); + public ProductTypeServiceImpl(@Nonnull final BaseSyncOptions syncOptions) { super(syncOptions); } @@ -82,16 +85,58 @@ private static Map getAttributeMetaDataMap( public CompletionStage>> fetchCachedProductAttributeMetaDataMap(@Nonnull final String productTypeId) { - if (productsAttributesMetaData.isEmpty()) { - return fetchAndCacheProductMetaData(productTypeId); + if (!productsAttributesMetaData.isEmpty()) { + return CompletableFuture.completedFuture( + Optional.ofNullable(productsAttributesMetaData.get(productTypeId))); + } + + CompletableFuture loadingFuture = cacheLoadingFuture; + if (loadingFuture != null && !loadingFuture.isDone()) { + return loadingFuture.thenApply( + ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId))); + } + + synchronized (cacheLoadingLock) { + if (!productsAttributesMetaData.isEmpty()) { + return CompletableFuture.completedFuture( + Optional.ofNullable(productsAttributesMetaData.get(productTypeId))); + } + + loadingFuture = cacheLoadingFuture; + if (loadingFuture != null && !loadingFuture.isDone()) { + return loadingFuture.thenApply( + ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId))); + } + + final CompletableFuture newLoadingFuture = new CompletableFuture<>(); + cacheLoadingFuture = newLoadingFuture; + + fetchAndCacheAllProductMetaData() + .whenComplete( + (result, throwable) -> { + if (throwable != null) { + newLoadingFuture.completeExceptionally(throwable); + } else { + newLoadingFuture.complete(null); + } + }); + + return newLoadingFuture.thenApply( + ignored -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId))); } - return CompletableFuture.completedFuture( - Optional.ofNullable(productsAttributesMetaData.get(productTypeId))); } + /** + * Fetches all product types from CTP and caches their attribute metadata. + * + *

This method is called only once during the first request for attribute metadata. All + * subsequent requests will use the cached data. + * + * @return a {@link CompletionStage} that completes when all product types have been fetched and + * cached + */ @Nonnull - private CompletionStage>> fetchAndCacheProductMetaData( - @Nonnull final String productTypeId) { + private CompletionStage fetchAndCacheAllProductMetaData() { final Consumer> productTypePageConsumer = productTypePage -> productTypePage.forEach( @@ -102,8 +147,7 @@ private CompletionStage>> fetchAndCacheP final ByProjectKeyProductTypesGet byProjectKeyProductTypesGet = this.syncOptions.getCtpClient().productTypes().get(); - return QueryUtils.queryAll(byProjectKeyProductTypesGet, productTypePageConsumer) - .thenApply(result -> Optional.ofNullable(productsAttributesMetaData.get(productTypeId))); + return QueryUtils.queryAll(byProjectKeyProductTypesGet, productTypePageConsumer); } @Nonnull