Skip to content

Commit 74567fe

Browse files
committed
Making the Subscribers use a common base class
1 parent a93112a commit 74567fe

File tree

1 file changed

+88
-79
lines changed

1 file changed

+88
-79
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 88 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -618,24 +618,23 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
618618
return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
619619
}
620620

621-
private class DataLoaderSubscriber implements Subscriber<V> {
621+
private abstract class DataLoaderSubscriberBase<T> implements Subscriber<T> {
622622

623-
private final CompletableFuture<List<V>> valuesFuture;
624-
private final List<K> keys;
625-
private final List<Object> callContexts;
626-
private final List<CompletableFuture<V>> queuedFutures;
623+
final CompletableFuture<List<V>> valuesFuture;
624+
final List<K> keys;
625+
final List<Object> callContexts;
626+
final List<CompletableFuture<V>> queuedFutures;
627627

628-
private final List<K> clearCacheKeys = new ArrayList<>();
629-
private final List<V> completedValues = new ArrayList<>();
630-
private int idx = 0;
631-
private boolean onErrorCalled = false;
632-
private boolean onCompleteCalled = false;
628+
List<K> clearCacheKeys = new ArrayList<>();
629+
List<V> completedValues = new ArrayList<>();
630+
boolean onErrorCalled = false;
631+
boolean onCompleteCalled = false;
633632

634-
private DataLoaderSubscriber(
635-
CompletableFuture<List<V>> valuesFuture,
636-
List<K> keys,
637-
List<Object> callContexts,
638-
List<CompletableFuture<V>> queuedFutures
633+
DataLoaderSubscriberBase(
634+
CompletableFuture<List<V>> valuesFuture,
635+
List<K> keys,
636+
List<Object> callContexts,
637+
List<CompletableFuture<V>> queuedFutures
639638
) {
640639
this.valuesFuture = valuesFuture;
641640
this.keys = keys;
@@ -648,40 +647,87 @@ public void onSubscribe(Subscription subscription) {
648647
subscription.request(keys.size());
649648
}
650649

651-
// onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
652-
// correctness (at the cost of speed).
653650
@Override
654-
public synchronized void onNext(V value) {
651+
public void onNext(T v) {
655652
assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked.");
656653
assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked.");
654+
}
657655

658-
K key = keys.get(idx);
659-
Object callContext = callContexts.get(idx);
660-
CompletableFuture<V> future = queuedFutures.get(idx);
656+
@Override
657+
public void onComplete() {
658+
assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked.");
659+
onCompleteCalled = true;
660+
}
661+
662+
@Override
663+
public void onError(Throwable throwable) {
664+
assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked.");
665+
onErrorCalled = true;
666+
667+
stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts));
668+
}
669+
670+
/*
671+
* A value has arrived - how do we complete the future that's associated with it in a common way
672+
*/
673+
void onNextValue(K key, V value, Object callContext, CompletableFuture<V> future) {
661674
if (value instanceof Try) {
662675
// we allow the batch loader to return a Try so we can better represent a computation
663676
// that might have worked or not.
677+
//noinspection unchecked
664678
Try<V> tryValue = (Try<V>) value;
665679
if (tryValue.isSuccess()) {
666680
future.complete(tryValue.get());
667681
} else {
668682
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
669683
future.completeExceptionally(tryValue.getThrowable());
670-
clearCacheKeys.add(keys.get(idx));
684+
clearCacheKeys.add(key);
671685
}
672686
} else {
673687
future.complete(value);
674688
}
689+
}
690+
691+
Throwable unwrapThrowable(Throwable ex) {
692+
if (ex instanceof CompletionException) {
693+
ex = ex.getCause();
694+
}
695+
return ex;
696+
}
697+
}
698+
699+
private class DataLoaderSubscriber extends DataLoaderSubscriberBase<V> {
700+
701+
private int idx = 0;
702+
703+
private DataLoaderSubscriber(
704+
CompletableFuture<List<V>> valuesFuture,
705+
List<K> keys,
706+
List<Object> callContexts,
707+
List<CompletableFuture<V>> queuedFutures
708+
) {
709+
super(valuesFuture, keys, callContexts, queuedFutures);
710+
}
711+
712+
// onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
713+
// correctness (at the cost of speed).
714+
@Override
715+
public synchronized void onNext(V value) {
716+
super.onNext(value);
717+
718+
K key = keys.get(idx);
719+
Object callContext = callContexts.get(idx);
720+
CompletableFuture<V> future = queuedFutures.get(idx);
721+
onNextValue(key, value, callContext, future);
675722

676723
completedValues.add(value);
677724
idx++;
678725
}
679726

727+
680728
@Override
681729
public void onComplete() {
682-
assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked.");
683-
onCompleteCalled = true;
684-
730+
super.onComplete();
685731
assertResultSize(keys, completedValues);
686732

687733
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
@@ -690,13 +736,8 @@ public void onComplete() {
690736

691737
@Override
692738
public void onError(Throwable ex) {
693-
assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked.");
694-
onErrorCalled = true;
695-
696-
stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts));
697-
if (ex instanceof CompletionException) {
698-
ex = ex.getCause();
699-
}
739+
super.onError(ex);
740+
ex = unwrapThrowable(ex);
700741
// Set the remaining keys to the exception.
701742
for (int i = idx; i < queuedFutures.size(); i++) {
702743
K key = keys.get(i);
@@ -706,32 +747,23 @@ public void onError(Throwable ex) {
706747
dataLoader.clear(key);
707748
}
708749
}
750+
709751
}
710752

711-
private class DataLoaderMapEntrySubscriber implements Subscriber<Map.Entry<K, V>> {
712-
private final CompletableFuture<List<V>> valuesFuture;
713-
private final List<K> keys;
714-
private final List<Object> callContexts;
715-
private final List<CompletableFuture<V>> queuedFutures;
753+
private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase<Map.Entry<K, V>> {
754+
716755
private final Map<K, Object> callContextByKey;
717756
private final Map<K, CompletableFuture<V>> queuedFutureByKey;
718-
719-
private final List<K> clearCacheKeys = new ArrayList<>();
720757
private final Map<K, V> completedValuesByKey = new HashMap<>();
721-
private boolean onErrorCalled = false;
722-
private boolean onCompleteCalled = false;
758+
723759

724760
private DataLoaderMapEntrySubscriber(
725-
CompletableFuture<List<V>> valuesFuture,
726-
List<K> keys,
727-
List<Object> callContexts,
728-
List<CompletableFuture<V>> queuedFutures
761+
CompletableFuture<List<V>> valuesFuture,
762+
List<K> keys,
763+
List<Object> callContexts,
764+
List<CompletableFuture<V>> queuedFutures
729765
) {
730-
this.valuesFuture = valuesFuture;
731-
this.keys = keys;
732-
this.callContexts = callContexts;
733-
this.queuedFutures = queuedFutures;
734-
766+
super(valuesFuture,keys,callContexts,queuedFutures);
735767
this.callContextByKey = new HashMap<>();
736768
this.queuedFutureByKey = new HashMap<>();
737769
for (int idx = 0; idx < queuedFutures.size(); idx++) {
@@ -743,42 +775,24 @@ private DataLoaderMapEntrySubscriber(
743775
}
744776
}
745777

746-
@Override
747-
public void onSubscribe(Subscription subscription) {
748-
subscription.request(keys.size());
749-
}
750778

751779
@Override
752780
public void onNext(Map.Entry<K, V> entry) {
753-
assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked.");
754-
assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked.");
781+
super.onNext(entry);
755782
K key = entry.getKey();
756783
V value = entry.getValue();
757784

758785
Object callContext = callContextByKey.get(key);
759786
CompletableFuture<V> future = queuedFutureByKey.get(key);
760-
if (value instanceof Try) {
761-
// we allow the batch loader to return a Try so we can better represent a computation
762-
// that might have worked or not.
763-
Try<V> tryValue = (Try<V>) value;
764-
if (tryValue.isSuccess()) {
765-
future.complete(tryValue.get());
766-
} else {
767-
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
768-
future.completeExceptionally(tryValue.getThrowable());
769-
clearCacheKeys.add(key);
770-
}
771-
} else {
772-
future.complete(value);
773-
}
787+
788+
onNextValue(key, value, callContext, future);
774789

775790
completedValuesByKey.put(key, value);
776791
}
777792

778793
@Override
779794
public void onComplete() {
780-
assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked.");
781-
onCompleteCalled = true;
795+
super.onComplete();
782796

783797
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
784798
List<V> values = new ArrayList<>(keys.size());
@@ -791,13 +805,8 @@ public void onComplete() {
791805

792806
@Override
793807
public void onError(Throwable ex) {
794-
assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked.");
795-
onErrorCalled = true;
796-
797-
stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts));
798-
if (ex instanceof CompletionException) {
799-
ex = ex.getCause();
800-
}
808+
super.onError(ex);
809+
ex = unwrapThrowable(ex);
801810
// Complete the futures for the remaining keys with the exception.
802811
for (int idx = 0; idx < queuedFutures.size(); idx++) {
803812
K key = keys.get(idx);

0 commit comments

Comments
 (0)