Skip to content

Commit 356be4b

Browse files
committed
Extract Runnable / RunnableFuture responsibilities away from CompletableTask (make separate hiden class for TaskExecutor);
1 parent daa1a89 commit 356be4b

File tree

7 files changed

+153
-30
lines changed

7 files changed

+153
-30
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ boolean onError(Throwable ex) {
137137
return callbackRegistry.failure(ex);
138138
}
139139

140-
class StageTransition extends FutureTask<T> {
140+
class StageTransition extends FutureTask<T>
141+
implements CompletableFuture.AsynchronousCompletionTask {
141142
StageTransition(Callable<T> callable) {
142143
super(callable);
143144
}

src/main/java/net/tascalate/concurrent/CompletableTask.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import java.time.Duration;
2121
import java.util.Set;
2222
import java.util.concurrent.Callable;
23-
import java.util.concurrent.CompletableFuture;
2423
import java.util.concurrent.CompletionStage;
2524
import java.util.concurrent.Executor;
26-
import java.util.concurrent.RunnableFuture;
2725
import java.util.concurrent.TimeUnit;
2826
import java.util.function.Supplier;
2927

@@ -38,9 +36,7 @@
3836
* @param <T>
3937
* a type of the successfully executed task result
4038
*/
41-
public class CompletableTask<T> extends AbstractCompletableTask<T>
42-
implements RunnableFuture<T>,
43-
CompletableFuture.AsynchronousCompletionTask {
39+
public class CompletableTask<T> extends AbstractCompletableTask<T> {
4440

4541
/**
4642
* Creates a CompletableTask; for internal use only
@@ -53,14 +49,6 @@ protected CompletableTask(Executor executor, Callable<T> callable) {
5349
super(executor, callable);
5450
}
5551

56-
/**
57-
* Executes wrapped {@link Callable}; don't use explicitly
58-
*/
59-
@Override
60-
public void run() {
61-
task.run();
62-
}
63-
6452
/**
6553
* Returns a resolved {@link Promise} with specified value; the promise is "bound" to the specified executor.
6654
* I.e. any function passed to composition methods of Promise (like <code>thenApplyAsync</code>
@@ -89,7 +77,7 @@ public void run() {
8977
*/
9078
public static <T> Promise<T> completed(T value, Executor defaultExecutor) {
9179
CompletableTask<T> result = new CompletableTask<>(defaultExecutor, () -> value);
92-
SAME_THREAD_EXECUTOR.execute(result);
80+
SAME_THREAD_EXECUTOR.execute(result.task);
9381
return result;
9482
}
9583

@@ -226,7 +214,7 @@ public static <U> Promise<U> supplyAsync(Supplier<U> supplier, Executor executor
226214
*/
227215
public static <U> Promise<U> submit(Callable<U> call, Executor executor) {
228216
CompletableTask<U> result = new CompletableTask<>(executor, call);
229-
executor.execute(result);
217+
executor.execute(result.task);
230218
return result;
231219
}
232220

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright 2015-2020 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package net.tascalate.concurrent;
17+
18+
import java.util.concurrent.RunnableFuture;
19+
20+
public interface RunnablePromise<T> extends Promise<T>, RunnableFuture<T> {
21+
22+
}

src/main/java/net/tascalate/concurrent/TaskExecutorCompletionService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ExecutorCompletionService;
2525
import java.util.concurrent.Executors;
2626
import java.util.concurrent.Future;
27+
import java.util.concurrent.RunnableFuture;
2728
import java.util.concurrent.TimeUnit;
2829

2930
public class TaskExecutorCompletionService<V> extends ExecutorCompletionService<V>
@@ -121,13 +122,13 @@ public Promise<?> submit(Runnable task) {
121122
*/
122123

123124
@Override
124-
protected <T> CompletableTask<T> newTaskFor(Runnable runnable, T value) {
125+
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
125126
return newTaskFor(Executors.callable(runnable, value));
126127
}
127128

128129
@Override
129-
protected <T> CompletableTask<T> newTaskFor(Callable<T> callable) {
130-
return new CompletableTask<>(this, callable);
130+
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
131+
return TaskExecutors.newRunnablePromise(this, callable);
131132
}
132133

133134
}

src/main/java/net/tascalate/concurrent/TaskExecutors.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import java.util.List;
2020

2121
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.Executor;
2325
import java.util.concurrent.ExecutorService;
2426
import java.util.concurrent.Executors;
2527
import java.util.concurrent.Future;
@@ -178,6 +180,26 @@ public static TaskExecutorService adapt(ExecutorService executorService) {
178180
}
179181
}
180182

183+
static <T> RunnablePromise<T> newRunnablePromise(Executor executor, Callable<T> callable) {
184+
return new RunnableCompletableTask<>(executor, callable);
185+
}
186+
187+
static class RunnableCompletableTask<T> extends CompletableTask<T>
188+
implements RunnablePromise<T>,
189+
CompletableFuture.AsynchronousCompletionTask{
190+
191+
RunnableCompletableTask(Executor executor, Callable<T> callable) {
192+
super(executor, callable);
193+
}
194+
195+
196+
@Override
197+
public void run() {
198+
task.run();
199+
}
200+
}
201+
202+
181203
static class TaskExecutorServiceAdapter implements TaskExecutorService {
182204
private final ExecutorService delegate;
183205

@@ -210,19 +232,19 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
210232
}
211233

212234
public <T> Promise<T> submit(Callable<T> callable) {
213-
CompletableTask<T> task = createTask(callable);
235+
RunnablePromise<T> task = newRunnablePromise(this, callable);
214236
delegate.execute(task);
215237
return task;
216238
}
217239

218240
public <T> Promise<T> submit(Runnable codeBlock, T result) {
219-
CompletableTask<T> task = createTask(Executors.callable(codeBlock, result));
241+
RunnablePromise<T> task = newRunnablePromise(this, Executors.callable(codeBlock, result));
220242
delegate.execute(task);
221243
return task;
222244
}
223245

224246
public Promise<?> submit(Runnable codeBlock) {
225-
CompletableTask<?> task = createTask(Executors.callable(codeBlock, null));
247+
RunnablePromise<?> task = newRunnablePromise(this, Executors.callable(codeBlock, null));
226248
delegate.execute(task);
227249
return task;
228250
}
@@ -246,10 +268,5 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
246268

247269
return delegate.invokeAny(tasks, timeout, unit);
248270
}
249-
250-
protected <T> CompletableTask<T> createTask(Callable<T> callable) {
251-
return new CompletableTask<T>(this, callable);
252-
}
253-
254271
}
255272
}

src/main/java/net/tascalate/concurrent/ThreadPoolTaskExecutor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Callable;
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.RejectedExecutionHandler;
22+
import java.util.concurrent.RunnableFuture;
2223
import java.util.concurrent.ThreadFactory;
2324
import java.util.concurrent.ThreadPoolExecutor;
2425
import java.util.concurrent.TimeUnit;
@@ -81,13 +82,13 @@ public <T> Promise<T> submit(Callable<T> task) {
8182
}
8283

8384
@Override
84-
protected <T> CompletableTask<T> newTaskFor(Runnable runnable, T value) {
85+
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
8586
return newTaskFor(Executors.callable(runnable, value));
8687
}
8788

8889
@Override
89-
protected <T> CompletableTask<T> newTaskFor(Callable<T> callable) {
90-
return new CompletableTask<>(this, callable);
90+
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
91+
return TaskExecutors.newRunnablePromise(this, callable);
9192
}
9293

9394
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package net.tascalate.concurrent.delays;
2+
3+
import java.time.Duration;
4+
import java.util.Iterator;
5+
import java.util.Random;
6+
import java.util.concurrent.CompletionStage;
7+
8+
@SuppressWarnings("unused")
9+
public class NanosTest {
10+
11+
public static void main(String[] args) {
12+
13+
// Duration dd = Duration.ofMillis(Long.MAX_VALUE);
14+
System.out.println(Long.MAX_VALUE);
15+
System.out.println(-Long.MIN_VALUE);
16+
System.out.println(Long.MIN_VALUE);
17+
System.out.println(-Long.MAX_VALUE);
18+
System.out.println("--REF-- = " + Long.MAX_VALUE);
19+
// System.out.println("NANOS = " + DurationCalcs.safeExtractAmount(dd, 0));
20+
// System.out.println("MICROS = " + DurationCalcs.safeExtractAmount(dd, 1));
21+
// System.out.println("MILLIS = " + DurationCalcs.safeExtractAmount(dd, 2));
22+
// System.out.println("SECONDS = " + DurationCalcs.safeExtractAmount(dd, 3));
23+
//
24+
// System.out.println(Duration.ofMillis(Long.MAX_VALUE).getSeconds());
25+
// System.out.println("++");
26+
Duration d = Duration.ofSeconds((long)(Long.MAX_VALUE * 0.95));
27+
System.out.println(d.getSeconds());
28+
System.out.println(random1(d).getSeconds());
29+
System.out.println(random1(d).getSeconds());
30+
System.out.println(random1(d).getSeconds());
31+
System.out.println(random1(d).getSeconds());
32+
System.out.println(random1(d).getSeconds());
33+
System.out.println(random1(d).getSeconds());
34+
}
35+
36+
private static Duration exp(Duration initialDelay, double multiplier, int pow) {
37+
double factor = Math.pow(multiplier, pow);
38+
return DurationCalcs.safeTransform(
39+
initialDelay,
40+
(amount, dimIdx) -> Long.MAX_VALUE / amount > factor ? 1 : 0,
41+
(amount, dimIdx) -> (long)(amount * factor)
42+
);
43+
}
44+
45+
private static Duration random1(Duration initialDelay) {
46+
double randomizer = random.nextDouble();
47+
return DurationCalcs.safeTransform(
48+
initialDelay,
49+
(amount, dimIdx) -> checkBounds1(amount, randomizer) ? 1 : 0,
50+
(amount, dimIdx) -> addRandomJitter1(amount, randomizer)
51+
);
52+
}
53+
54+
private static Duration random2(Duration initialDelay, double multiplier, int pow) {
55+
double randomizer = random.nextDouble();
56+
return DurationCalcs.safeTransform(
57+
initialDelay,
58+
(amount, dimIdx) -> checkBounds1(amount, randomizer) ? 1 : 0,
59+
(amount, dimIdx) -> addRandomJitter1(amount, randomizer)
60+
);
61+
}
62+
63+
64+
static final Random random = new Random();
65+
66+
67+
static long addRandomJitter1(long initialDelay, double randomizer) {
68+
double randomMultiplier = (1 - 2 * randomizer) * 0.1;
69+
System.out.println("::MULT:: " + (long)(initialDelay * (1 + randomMultiplier)));
70+
return Math.max(0, (long) (initialDelay * (1 + randomMultiplier)));
71+
}
72+
73+
static boolean checkBounds1(long initialDelay, double randomizer) {
74+
double randomMultiplier = (1 - 2 * randomizer) * 0.1;
75+
System.out.println("::MULT:: " + (long)(initialDelay * (1 + randomMultiplier)));
76+
return (double)Long.MAX_VALUE / initialDelay > Math.abs(randomMultiplier + 1);
77+
}
78+
79+
/*
80+
if (initialDelay > 0) {
81+
if (uniformRandom > 0)
82+
return Long.MAX_VALUE - initialDelay > uniformRandom; //+MAX > +A + +B
83+
else
84+
return Long.MIN_VALUE + initialDelay < uniformRandom; //-MIN < -A - (+B)
85+
} else {
86+
if (uniformRandom < 0)
87+
return Long.MIN_VALUE - initialDelay < uniformRandom; //-MIN < -A + (-B)
88+
else
89+
return Long.MAX_VALUE + initialDelay > uniformRandom; //+MAX > +A - (-B)
90+
}
91+
92+
*/
93+
}

0 commit comments

Comments
 (0)