Skip to content

Commit 944be8e

Browse files
committed
Fix boundary cases when working with Duration-s that doesnt' fully fit in NANOS and/or SECONDS
1 parent c509513 commit 944be8e

File tree

8 files changed

+205
-27
lines changed

8 files changed

+205
-27
lines changed

src/main/java/net/tascalate/concurrent/DelayPolicy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ default DelayPolicy withUniformJitter() {
7676
}
7777

7878
default DelayPolicy withUniformJitter(long range) {
79+
return withUniformJitter(range, TimeUnit.MILLISECONDS);
80+
}
81+
82+
default DelayPolicy withUniformJitter(long range, TimeUnit timeUnit) {
83+
return withUniformJitter(Timeouts.toDuration(range, timeUnit));
84+
}
85+
86+
default DelayPolicy withUniformJitter(Duration range) {
7987
return new UniformRandomDelayPolicy(this, range);
8088
}
8189

@@ -122,5 +130,9 @@ default DelayPolicy withMaxDelay(long maxDelayMillis) {
122130
default DelayPolicy withFirstRetryNoDelay() {
123131
return new FirstRetryNoDelayPolicy(this);
124132
}
133+
134+
public static boolean isValid(Duration d) {
135+
return !(d.isNegative() || d.isZero());
136+
}
125137

126138
}

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -518,14 +518,14 @@ private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
518518
// Call should be done via CompletableTask to let it be interruptible
519519
Promise<?> p = CompletableTask.runAsync(doCall, executor);
520520
Duration timeout = answer.timeout();
521-
if (Timeouts.isValid(timeout)) {
521+
if (DelayPolicy.isValid(timeout)) {
522522
p.orTimeout( timeout );
523523
}
524524
return p;
525525
};
526526

527527
Duration backoffDelay = answer.backoffDelay();
528-
if (Timeouts.isValid(backoffDelay)) {
528+
if (DelayPolicy.isValid(backoffDelay)) {
529529
// Timeout itself
530530
Promise<?> backoff = Timeouts.delay( backoffDelay );
531531
// Invocation after timeout

src/main/java/net/tascalate/concurrent/Timeouts.java

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package net.tascalate.concurrent;
1717

1818
import java.time.Duration;
19+
import java.time.temporal.ChronoUnit;
20+
import java.util.Objects;
1921
import java.util.concurrent.Executors;
2022
import java.util.concurrent.Future;
2123
import java.util.concurrent.ScheduledExecutorService;
@@ -38,10 +40,23 @@ private Timeouts() {}
3840
* the new promise
3941
*/
4042
static Promise<Duration> delay(Duration duration) {
43+
TimeUnit unit;
44+
long amount;
45+
// Try to get value with best precision without throwing ArythmeticException due to overflow
46+
if (duration.compareTo(MAX_BY_NANOS) < 0) {
47+
amount = duration.toNanos();
48+
unit = TimeUnit.NANOSECONDS;
49+
} else if (duration.compareTo(MAX_BY_MILLIS) < 0) {
50+
amount = duration.toMillis();
51+
unit = TimeUnit.MILLISECONDS;
52+
} else {
53+
amount = duration.getSeconds();
54+
unit = TimeUnit.SECONDS;
55+
}
56+
4157
final CompletablePromise<Duration> promise = new CompletablePromise<>();
4258
final Future<?> timeout = scheduler.schedule(
43-
() -> promise.onSuccess(duration),
44-
duration.toNanos(), TimeUnit.NANOSECONDS
59+
() -> promise.onSuccess(duration), amount, unit
4560
);
4661
promise.whenComplete((r, e) -> {
4762
if (null != e) {
@@ -72,10 +87,24 @@ static Promise<Duration> delay(long delay, TimeUnit timeUnit) {
7287
* the new promise
7388
*/
7489
static <T> Promise<T> failAfter(Duration duration) {
90+
TimeUnit unit;
91+
long amount;
92+
// Try to get value with best precision without throwing ArythmeticException due to overflow
93+
if (duration.compareTo(MAX_BY_NANOS) < 0) {
94+
amount = duration.toNanos();
95+
unit = TimeUnit.NANOSECONDS;
96+
} else if (duration.compareTo(MAX_BY_MILLIS) < 0) {
97+
amount = duration.toMillis();
98+
unit = TimeUnit.MILLISECONDS;
99+
} else {
100+
amount = duration.getSeconds();
101+
unit = TimeUnit.SECONDS;
102+
}
103+
75104
final CompletablePromise<T> promise = new CompletablePromise<>();
76105
final Future<?> timeout = scheduler.schedule(
77106
() -> promise.onFailure(new TimeoutException("Timeout after " + duration)),
78-
duration.toNanos(), TimeUnit.NANOSECONDS
107+
amount, unit
79108
);
80109
promise.whenComplete((r, e) -> timeout.cancel(true));
81110
return promise;
@@ -95,11 +124,7 @@ static <T> Promise<T> failAfter(long delay, TimeUnit timeUnit) {
95124
}
96125

97126
static Duration toDuration(long delay, TimeUnit timeUnit) {
98-
return Duration.ofNanos(timeUnit.toNanos(delay));
99-
}
100-
101-
static boolean isValid(Duration d) {
102-
return !(d.isNegative() || d.isZero());
127+
return Duration.of(delay, toChronoUnit(timeUnit));
103128
}
104129

105130
static <T, U> BiConsumer<T, U> timeoutsCleanup(Promise<T> self, Promise<?> timeout, boolean cancelOnTimeout) {
@@ -134,6 +159,31 @@ static <T, E extends Throwable> BiConsumer<T, E> configureDelay(Promise<? extend
134159
};
135160
}
136161

162+
private static ChronoUnit toChronoUnit(TimeUnit unit) {
163+
Objects.requireNonNull(unit, "unit");
164+
switch (unit) {
165+
case NANOSECONDS:
166+
return ChronoUnit.NANOS;
167+
case MICROSECONDS:
168+
return ChronoUnit.MICROS;
169+
case MILLISECONDS:
170+
return ChronoUnit.MILLIS;
171+
case SECONDS:
172+
return ChronoUnit.SECONDS;
173+
case MINUTES:
174+
return ChronoUnit.MINUTES;
175+
case HOURS:
176+
return ChronoUnit.HOURS;
177+
case DAYS:
178+
return ChronoUnit.DAYS;
179+
default:
180+
throw new IllegalArgumentException("Unknown TimeUnit constant");
181+
}
182+
}
183+
184+
private static final Duration MAX_BY_NANOS = Duration.ofNanos(Long.MAX_VALUE);
185+
private static final Duration MAX_BY_MILLIS = Duration.ofMillis(Long.MAX_VALUE);
186+
137187
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
138188
@Override
139189
public Thread newThread(Runnable r) {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package net.tascalate.concurrent.delays;
2+
3+
import java.time.Duration;
4+
import java.time.temporal.ChronoUnit;
5+
import java.util.function.LongBinaryOperator;
6+
7+
class DurationCalcs {
8+
private DurationCalcs() {}
9+
10+
static Duration safeTransform(Duration duration, LongBinaryOperator isConversionSafe, LongBinaryOperator conversion) {
11+
long amount;
12+
int dimIdx;
13+
// Try to get value with best precision without throwing ArythmeticException due to overflow
14+
if (duration.compareTo(MAX_BY_NANOS) < 0) {
15+
amount = duration.toNanos();
16+
dimIdx = 0;
17+
} else if (duration.compareTo(MAX_BY_MILLIS) < 0) {
18+
amount = duration.toMillis();
19+
dimIdx = 2;
20+
} else {
21+
amount = duration.getSeconds();
22+
dimIdx = 3;
23+
}
24+
int count = TIME_DIMENSIONS.length;
25+
for (; dimIdx < count; dimIdx++) {
26+
if (toBoolean(isConversionSafe.applyAsLong(amount, dimIdx))) {
27+
amount = conversion.applyAsLong(amount, dimIdx);
28+
return Duration.of(amount, TIME_DIMENSIONS[dimIdx]);
29+
} else {
30+
amount /= 1000;
31+
// try again on next iteration
32+
}
33+
}
34+
// return max possible value if doesn't fit
35+
return Duration.of(Long.MAX_VALUE, TIME_DIMENSIONS[count - 1]);
36+
37+
}
38+
39+
static long safeExtractAmount(Duration duration, int targetDimIdx) {
40+
long amount;
41+
int sourceDimIdx;
42+
if (duration.compareTo(MAX_BY_NANOS) < 0) {
43+
amount = duration.toNanos();
44+
sourceDimIdx = 0;
45+
} else if (duration.compareTo(MAX_BY_MILLIS) < 0) {
46+
amount = duration.toMillis();
47+
sourceDimIdx = 2;
48+
} else {
49+
amount = duration.getSeconds();
50+
sourceDimIdx = 3;
51+
}
52+
// No conversion necessary
53+
if (sourceDimIdx == targetDimIdx) {
54+
return amount;
55+
}
56+
double factor = Math.pow(1000, sourceDimIdx - targetDimIdx);
57+
if (Long.MAX_VALUE / amount > factor) {
58+
return (long)(amount * factor);
59+
} else {
60+
return Long.MAX_VALUE;
61+
}
62+
}
63+
64+
static long toBoolean(boolean v) {
65+
return v ? 1 : 0;
66+
}
67+
68+
private static boolean toBoolean(long v) {
69+
return v != 0;
70+
}
71+
72+
private static final ChronoUnit[] TIME_DIMENSIONS = new ChronoUnit[]{
73+
ChronoUnit.NANOS, ChronoUnit.MICROS, ChronoUnit.MILLIS, ChronoUnit.SECONDS
74+
};
75+
76+
private static final Duration MAX_BY_NANOS = Duration.ofNanos(Long.MAX_VALUE);
77+
private static final Duration MAX_BY_MILLIS = Duration.ofMillis(Long.MAX_VALUE);
78+
}

src/main/java/net/tascalate/concurrent/delays/ExponentialDelayPolicy.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public ExponentialDelayPolicy(long initialDelayMillis, double multiplier) {
4040
}
4141

4242
public ExponentialDelayPolicy(Duration initialDelay, double multiplier) {
43-
if (initialDelay.toNanos() <= 0) {
43+
if (!DelayPolicy.isValid(initialDelay)) {
4444
throw new IllegalArgumentException("Initial delay must be positive but was: " + initialDelay);
4545
}
4646
this.initialDelay = initialDelay;
@@ -49,6 +49,11 @@ public ExponentialDelayPolicy(Duration initialDelay, double multiplier) {
4949

5050
@Override
5151
public Duration delay(RetryContext context) {
52-
return Duration.ofNanos( (long) (initialDelay.toNanos() * Math.pow(multiplier, context.getRetryCount())) );
52+
double factor = Math.pow(multiplier, context.getRetryCount());
53+
return DurationCalcs.safeTransform(
54+
initialDelay,
55+
(amount, dimIdx) -> DurationCalcs.toBoolean(Long.MAX_VALUE / amount > factor),
56+
(amount, dimIdx) -> (long)(amount * factor)
57+
);
5358
}
5459
}

src/main/java/net/tascalate/concurrent/delays/ProportionalRandomDelayPolicy.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,29 @@ public ProportionalRandomDelayPolicy(DelayPolicy target, Random random) {
4444

4545
public ProportionalRandomDelayPolicy(DelayPolicy target, double multiplier) {
4646
super(target);
47+
if (multiplier <= 0 || multiplier >= 1) {
48+
throw new IllegalArgumentException("Multiplier should be within (0..1) exclusively");
49+
}
4750
this.multiplier = multiplier;
4851
}
4952

5053
public ProportionalRandomDelayPolicy(DelayPolicy target, double multiplier, Random random) {
5154
super(target, random);
55+
if (multiplier <= 0 || multiplier >= 1) {
56+
throw new IllegalArgumentException("Multiplier should be within (0..1) exclusively");
57+
}
5258
this.multiplier = multiplier;
5359
}
5460

5561
@Override
56-
long addRandomJitter(long initialDelay) {
57-
final double randomMultiplier = (1 - 2 * random().nextDouble()) * multiplier;
62+
long addRandomJitter(long initialDelay, double randomizer, int dimIdx) {
63+
double randomMultiplier = (1 - 2 * randomizer) * multiplier;
5864
return (long) (initialDelay * (1 + randomMultiplier));
5965
}
66+
67+
@Override
68+
boolean checkBounds(long initialDelay, double randomizer, int dimIdx) {
69+
double randomMultiplier = (1 - 2 * randomizer) * multiplier;
70+
return Long.MAX_VALUE / initialDelay > randomMultiplier + 1;
71+
}
6072
}

src/main/java/net/tascalate/concurrent/delays/RandomDelayPolicy.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,21 @@ private RandomDelayPolicy(DelayPolicy target, Supplier<Random> randomSource) {
4545
super(target);
4646
this.randomSource = randomSource;
4747
}
48-
48+
4949
@Override
5050
public Duration delay(RetryContext context) {
51-
final Duration initialDelay = target.delay(context);
52-
final long randomDelay = addRandomJitter(initialDelay.toNanos());
53-
return max(Duration.ofNanos(randomDelay), Duration.ZERO);
51+
Duration initialDelay = target.delay(context);
52+
double randomizer = random().nextDouble();
53+
return DurationCalcs.safeTransform(
54+
initialDelay,
55+
(amount, dimIdx) -> DurationCalcs.toBoolean(checkBounds(amount, randomizer, (int)dimIdx)),
56+
(amount, dimIdx) -> addRandomJitter(amount, randomizer, (int)dimIdx)
57+
);
5458
}
55-
56-
abstract long addRandomJitter(long initialDelay);
57-
59+
60+
abstract long addRandomJitter(long initialDelay, double randomizer, int dimIdx);
61+
abstract boolean checkBounds(long initialDelay, double randomizer, int dimIdx);
62+
5863
protected Random random() {
5964
return randomSource.get();
6065
}

src/main/java/net/tascalate/concurrent/delays/UniformRandomDelayPolicy.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
*/
2323
package net.tascalate.concurrent.delays;
2424

25+
import java.time.Duration;
2526
import java.util.Random;
2627

2728
import net.tascalate.concurrent.DelayPolicy;
@@ -32,7 +33,7 @@ public class UniformRandomDelayPolicy extends RandomDelayPolicy {
3233
*/
3334
public static final long DEFAULT_RANDOM_RANGE_MILLIS = 100;
3435

35-
private final long range;
36+
private final Duration range;
3637

3738
public UniformRandomDelayPolicy(DelayPolicy target) {
3839
this(target, DEFAULT_RANDOM_RANGE_MILLIS);
@@ -42,20 +43,35 @@ public UniformRandomDelayPolicy(DelayPolicy target, Random random) {
4243
this(target, DEFAULT_RANDOM_RANGE_MILLIS, random);
4344
}
4445

45-
public UniformRandomDelayPolicy(DelayPolicy target, final long range) {
46+
public UniformRandomDelayPolicy(DelayPolicy target, long range) {
47+
this(target, Duration.ofMillis(range));
48+
}
49+
50+
public UniformRandomDelayPolicy(DelayPolicy target, Duration range) {
4651
super(target);
4752
this.range = range;
4853
}
4954

50-
public UniformRandomDelayPolicy(DelayPolicy target, final long range, Random random) {
55+
public UniformRandomDelayPolicy(DelayPolicy target, long range, Random random) {
56+
this(target, Duration.ofMillis(range), random);
57+
}
58+
59+
public UniformRandomDelayPolicy(DelayPolicy target, Duration range, Random random) {
5160
super(target, random);
5261
this.range = range;
5362
}
5463

5564
@Override
56-
long addRandomJitter(long initialDelay) {
57-
final double uniformRandom = (1 - random().nextDouble() * 2) * range;
58-
return (long) (initialDelay + uniformRandom);
65+
long addRandomJitter(long initialDelay, double randomizer, int dimIdx) {
66+
long rangeNormalized = DurationCalcs.safeExtractAmount(range, dimIdx);
67+
double uniformRandom = (1 - randomizer * 2) * rangeNormalized;
68+
return Math.max(0, (long) (initialDelay + uniformRandom));
5969
}
6070

71+
@Override
72+
boolean checkBounds(long initialDelay, double randomizer, int dimIdx) {
73+
long rangeNormalized = DurationCalcs.safeExtractAmount(range, dimIdx);
74+
double uniformRandom = (1 - randomizer * 2) * rangeNormalized;
75+
return Long.MAX_VALUE - initialDelay > uniformRandom;
76+
}
6177
}

0 commit comments

Comments
 (0)