Skip to content

Commit 2dac726

Browse files
committed
Minor stylistic changes
1 parent bc21475 commit 2dac726

File tree

3 files changed

+56
-72
lines changed

3 files changed

+56
-72
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,6 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
253253
}
254254
CompletionStage<U> returned = fn.apply(r);
255255
// nextStage is in progress
256-
// IMPORTANT: it COULD be shared, but typically is not
257-
// So in very rare case some nasty behavior MAY exist
258-
// if others depends on it
259256

260257
// TEST: There is a race when fn.apply(r) is completed
261258
// normally and nextStage is cancelled before returned is set

src/main/java/net/tascalate/concurrent/TaskExecutorCompletionService.java

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -34,102 +34,90 @@ public TaskExecutorCompletionService(TaskExecutorService executor) {
3434
super(wrapExecutor(executor));
3535
}
3636

37-
@SuppressWarnings("unchecked")
3837
public TaskExecutorCompletionService(TaskExecutorService executor,
3938
BlockingQueue<Promise<V>> completionQueue) {
40-
super(wrapExecutor(executor), (BlockingQueue<Future<V>>)(BlockingQueue<?>)completionQueue);
39+
super(wrapExecutor(executor), cast(completionQueue));
4140
}
4241

42+
@Override
4343
public Promise<V> submit(Callable<V> task) {
4444
return (Promise<V>)super.submit(task);
4545
}
4646

47+
@Override
4748
public Promise<V> submit(Runnable task, V result) {
4849
return (Promise<V>)super.submit(task, result);
4950
}
5051

52+
@Override
5153
public Promise<V> take() throws InterruptedException {
5254
return (Promise<V>)super.take();
5355
}
5456

57+
@Override
5558
public Promise<V> poll() {
5659
return (Promise<V>)super.poll();
5760
}
5861

62+
@Override
5963
public Promise<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
6064
return (Promise<V>)super.poll(timeout, unit);
6165
}
66+
67+
@SuppressWarnings("unchecked")
68+
private static <V> BlockingQueue<Future<V>> cast(BlockingQueue<? extends Future<V>> queue) {
69+
return (BlockingQueue<Future<V>>)queue;
70+
}
6271

6372
private static Executor wrapExecutor(Executor executor) {
6473
if (executor instanceof TaskExecutorService && executor instanceof AbstractExecutorService) {
6574
return executor;
6675
} else {
67-
return new GenericExecutorWrapper(executor);
68-
}
69-
}
70-
71-
static class GenericExecutorWrapper extends AbstractExecutorService {
72-
protected final Executor delegate;
73-
74-
GenericExecutorWrapper(Executor delegate) {
75-
this.delegate = delegate;
76-
}
77-
78-
@Override
79-
public void shutdown() {
80-
}
81-
82-
@Override
83-
public List<Runnable> shutdownNow() {
84-
return Collections.emptyList();
85-
}
86-
87-
@Override
88-
public boolean isShutdown() {
89-
return false;
90-
}
91-
92-
@Override
93-
public boolean isTerminated() {
94-
return false;
95-
}
96-
97-
@Override
98-
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
99-
return false;
100-
}
101-
102-
@Override
103-
public void execute(Runnable command) {
104-
delegate.execute(command);
105-
}
106-
107-
/*
108-
@Override
109-
public <T> Promise<T> submit(Callable<T> task) {
110-
return (Promise<T>)super.submit(task);
111-
}
112-
113-
@Override
114-
public <T> Promise<T> submit(Runnable task, T result) {
115-
return (Promise<T>)super.submit(task, result);
116-
}
117-
118-
@Override
119-
public Promise<?> submit(Runnable task) {
120-
return (Promise<?>)super.submit(task);
121-
}
122-
*/
123-
124-
@Override
125-
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
126-
return newTaskFor(Executors.callable(runnable, value));
76+
return new AbstractExecutorService() {
77+
private volatile boolean terminated;
78+
79+
@Override
80+
public void execute(Runnable command) {
81+
executor.execute(command);
82+
}
83+
84+
@Override
85+
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
86+
return newTaskFor(Executors.callable(runnable, value));
87+
}
88+
89+
@Override
90+
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
91+
return TaskExecutors.newRunnablePromise(this, callable);
92+
}
93+
94+
// Just no-ops to fulfill ExecutorService contract
95+
@Override
96+
public void shutdown() {
97+
terminated = true;
98+
}
99+
100+
@Override
101+
public List<Runnable> shutdownNow() {
102+
terminated = true;
103+
return Collections.emptyList();
104+
}
105+
106+
@Override
107+
public boolean isShutdown() {
108+
return terminated;
109+
}
110+
111+
@Override
112+
public boolean isTerminated() {
113+
return terminated;
114+
}
115+
116+
@Override
117+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
118+
return true;
119+
}
120+
};
127121
}
128-
129-
@Override
130-
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
131-
return TaskExecutors.newRunnablePromise(this, callable);
132-
}
133-
134122
}
135123
}

src/main/java/net/tascalate/concurrent/TaskExecutors.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ static class RunnableCompletableTask<T> extends CompletableTask<T>
190190
super(executor, callable);
191191
}
192192

193-
194193
@Override
195194
public void run() {
196195
runTask();

0 commit comments

Comments
 (0)