Skip to content

Commit 0b3a603

Browse files
committed
Promises.poll minor refactoring
1 parent d75d1e4 commit 0b3a603

File tree

1 file changed

+13
-14
lines changed

1 file changed

+13
-14
lines changed

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -547,15 +547,23 @@ public static <T> Promise<T> pollOptional(Callable<Optional<? extends T>> codeBl
547547
.map(AtomicReference::get)
548548
.ifPresent( p -> p.cancel(true) )
549549
);
550+
Consumer<Promise<?>> changeCallPromiseRef = p -> {
551+
// If result promise is cancelled after callPromise was set need to stop;
552+
callPromiseRef.set( p );
553+
if (promise.isDone()) {
554+
p.cancel(true);
555+
}
556+
};
557+
550558
RetryContext ctx = RetryContext.initial(retryPolicy);
551-
pollOnce(codeBlock, executor, ctx, promise, callPromiseRef);
559+
pollOnce(codeBlock, executor, ctx, promise, changeCallPromiseRef);
552560
return promise;
553561
}
554562

555563
private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
556564
Executor executor, RetryContext ctx,
557565
CompletablePromise<T> resultPromise,
558-
AtomicReference<Promise<?>> callPromiseRef) {
566+
Consumer<Promise<?>> changeCallPromiseRef) {
559567

560568
// Promise may be cancelled outside of polling
561569
if (resultPromise.isDone()) {
@@ -579,32 +587,23 @@ private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
579587
} else {
580588
long finishTime = System.currentTimeMillis();
581589
RetryContext nextCtx = ctx.nextRetry(finishTime - startTime);
582-
pollOnce(codeBlock, executor, nextCtx, resultPromise, callPromiseRef);
590+
pollOnce(codeBlock, executor, nextCtx, resultPromise, changeCallPromiseRef);
583591
}
584592
} catch (Exception ex) {
585593
long finishTime = System.currentTimeMillis();
586594
RetryContext nextCtx = ctx.nextRetry(finishTime - startTime, ex);
587-
pollOnce(codeBlock, executor, nextCtx, resultPromise, callPromiseRef);
595+
pollOnce(codeBlock, executor, nextCtx, resultPromise, changeCallPromiseRef);
588596
}
589597
};
590598

591599
Supplier<Promise<?>> submittedCall = () -> {
592600
// Call should be done via CompletableTask to let it be interruptible
593601
Promise<?> p = CompletableTask.runAsync(doCall, executor);
594602
if (answer.hasTimeout()) {
595-
p.orTimeout( Duration.ofMillis(Math.max(0, answer.timeoutDelayMillis()) ) );
603+
p.orTimeout( Duration.ofMillis(Math.max(0, answer.timeoutDelayMillis()) ) );
596604
}
597605
return p;
598606
};
599-
600-
Consumer<Promise<?>> changeCallPromiseRef = p -> {
601-
// If result promise is cancelled after callPromise was set need to stop;
602-
callPromiseRef.set( p );
603-
if (resultPromise.isDone()) {
604-
p.cancel(true);
605-
}
606-
};
607-
608607

609608
long backoffDelayMillis = answer.backoffDelayMillis();
610609
if (backoffDelayMillis > 0) {

0 commit comments

Comments
 (0)