Skip to content

Allow ValueCache to work with Publisher DataLoader #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
missedKeyIndexes.add(i);
missedKeys.add(keys.get(i));
missedKeyContexts.add(keyContexts.get(i));
missedQueuedFutures.add(queuedFutures.get(i));
} else {
queuedFutures.get(i).complete(cacheGet.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the code queuedFutures.get(i).complete(cacheGet.get()); here???

Before it never used that parameter in this mutative way (eg completing a value if its not a cache get failure)

Is this the bug ??? Does it only happen on Publishers ???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant find the test code for this fix

Copy link
Collaborator Author

@AlexandreCarlton AlexandreCarlton Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the bug ??? Does it only happen on Publishers ???

Yes - well, one of them (and it does indeed affect only the publishers). This particular line of code is needed if we have a cache hit in our value cache and want to return that - otherwise, we never complete, and our load returns a CompletableFuture that never completes. The way to check this is to comment out this line and re-run in particular DataLoaderValueCacheTest#batch_caching_works_as_expected.

I am not sure that this is the best way to solve this, but it does make the tests go green and doesn't appear to make anything else red - if this will cause other problems then we should add tests for that too (which I'm happy to do).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason it did not work is here AFTER the invokeLoader method is called

    private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object> callContexts, List<CompletableFuture<V>> queuedFutures) {
        stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, callContexts));
        CompletableFuture<List<V>> batchLoad = invokeLoader(keys, callContexts, queuedFutures, loaderOptions.cachingEnabled());
        return batchLoad
                .thenApply(values -> {
                    assertResultSize(keys, values);
                    if (isPublisher() || isMappedPublisher()) {
                        // We have already completed the queued futures by the time the overall batchLoad future has completed.
                        return values;
                    }

eg if its publisher its considered done while the others continue on to turn Trys into values. But I think where you have fixed it is approporiate

}
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/test/java/org/dataloader/DataLoaderValueCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class DataLoaderValueCacheTest {

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we "add" the new tests - we are now testing this against PublisherDataLoaderFactory and MappedPublisherDataLoaderFactory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I get it now - we have tests for non publisher data loaders and publisher data loaders. So its working for the old and the new

public void test_by_default_we_have_no_value_caching(TestDataLoaderFactory factory) {
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions();
Expand Down Expand Up @@ -68,7 +68,7 @@ public void test_by_default_we_have_no_value_caching(TestDataLoaderFactory facto
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void should_accept_a_remote_value_store_for_caching(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache();
List<Collection<String>> loadCalls = new ArrayList<>();
Expand Down Expand Up @@ -113,7 +113,7 @@ public void should_accept_a_remote_value_store_for_caching(TestDataLoaderFactory
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void can_use_caffeine_for_caching(TestDataLoaderFactory factory) {
//
// Mostly to prove that some other CACHE library could be used
Expand Down Expand Up @@ -154,7 +154,7 @@ public void can_use_caffeine_for_caching(TestDataLoaderFactory factory) {
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void will_invoke_loader_if_CACHE_GET_call_throws_exception(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

Expand Down Expand Up @@ -185,7 +185,7 @@ public CompletableFuture<Object> get(String key) {
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void will_still_work_if_CACHE_SET_call_throws_exception(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {
@Override
Expand Down Expand Up @@ -214,7 +214,7 @@ public CompletableFuture<Object> set(String key, Object value) {
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void caching_can_take_some_time_complete(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

Expand Down Expand Up @@ -256,7 +256,7 @@ public CompletableFuture<Object> get(String key) {
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void batch_caching_works_as_expected(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

Expand Down Expand Up @@ -303,7 +303,7 @@ public CompletableFuture<List<Try<Object>>> getValues(List<String> keys) {
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void assertions_will_be_thrown_if_the_cache_does_not_follow_contract(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

Expand Down Expand Up @@ -346,7 +346,7 @@ private boolean isAssertionException(CompletableFuture<String> fA) {


@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void if_caching_is_off_its_never_hit(TestDataLoaderFactory factory) {
AtomicInteger getCalls = new AtomicInteger();
CustomValueCache customValueCache = new CustomValueCache() {
Expand Down Expand Up @@ -380,7 +380,7 @@ public CompletableFuture<Object> get(String key) {
}

@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void if_everything_is_cached_no_batching_happens(TestDataLoaderFactory factory) {
AtomicInteger getCalls = new AtomicInteger();
AtomicInteger setCalls = new AtomicInteger();
Expand Down Expand Up @@ -423,7 +423,7 @@ public CompletableFuture<List<Object>> setValues(List<String> keys, List<Object>


@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#getWithoutPublisher")
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void if_batching_is_off_it_still_can_cache(TestDataLoaderFactory factory) {
AtomicInteger getCalls = new AtomicInteger();
AtomicInteger setCalls = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.stream.Stream;

public class TestDataLoaderFactories {

public static Stream<Arguments> get() {
return Stream.of(
Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())),
Expand All @@ -14,12 +15,4 @@ public static Stream<Arguments> get() {
Arguments.of(Named.of("Mapped Publisher DataLoader", new MappedPublisherDataLoaderFactory()))
);
}

// TODO: Remove in favour of #get when ValueCache supports Publisher Factories.
public static Stream<Arguments> getWithoutPublisher() {
return Stream.of(
Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())),
Arguments.of(Named.of("Mapped DataLoader", new MappedDataLoaderFactory()))
);
}
}
Loading