Skip to content

Commit ad1e422

Browse files
committed
clean Task class and add tests
1 parent 02e1e1f commit ad1e422

File tree

12 files changed

+177
-54
lines changed

12 files changed

+177
-54
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/application/ApplicationBootstrap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ static void main(Artifact artifact, String[] args, boolean debugMode, Applicatio
2828
start.block(0);
2929
t.getOutput().block(0);
3030
if (t.getOutput().isSuccessful())
31-
t.getResult().block(0);
31+
t.getOutput().getResult().block(0);
3232
LCCore.stop(true);
3333
}
3434

net.lecousin.core/src/main/java/net/lecousin/framework/application/launcher/DynamicLibrariesManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private void loadSplashFile(File splashFile) {
138138
Task<byte[], IOException> read = FullReadFileTask.create(splashFile, Task.Priority.URGENT);
139139
read.start();
140140
Task<Void,NoException> load = Task.cpu("Loading splash image", Task.Priority.URGENT, () -> {
141-
ImageIcon img = new ImageIcon(read.getResult());
141+
ImageIcon img = new ImageIcon(read.getOutput().getResult());
142142
if (splash == null) return null;
143143
synchronized (splash) {
144144
while (!splash.isReady())
@@ -303,7 +303,7 @@ private void loadApplicationLibrary(
303303
libraries.put(descr.getGroupId() + ':' + descr.getArtifactId(), lib);
304304
appLib = lib;
305305
Task.cpu("Load library " + lib.descr.getGroupId() + ':' + lib.descr.getArtifactId(), Task.Priority.IMPORTANT,
306-
new LoadLibrary(lib, resolveConflicts.getResult(), addPlugins, splash, stepLoad)).start();
306+
new LoadLibrary(lib, resolveConflicts.getOutput().getResult(), addPlugins, splash, stepLoad)).start();
307307
lib.load.thenStart(Task.cpu("Finishing to initialize", Task.Priority.IMPORTANT, () -> {
308308
if (canStartApp.hasError()) return null;
309309
app.getDefaultLogger().debug("Libraries initialized.");
@@ -806,7 +806,7 @@ public void run() {
806806
Task<Void, NoException> load = Task.cpu(
807807
"Load library " + l.descr.getGroupId() + ':' + l.descr.getArtifactId(),
808808
Task.Priority.IMPORTANT,
809-
new LoadLibrary(l, resolveConflicts.getResult(), null, progress, work));
809+
new LoadLibrary(l, resolveConflicts.getOutput().getResult(), null, progress, work));
810810
load.start();
811811
l.load.onDone(result, () -> l.library);
812812
}, result);

net.lecousin.core/src/main/java/net/lecousin/framework/application/launcher/Launcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,11 @@ private static IAsync<Exception> launchApplication(DynamicLibrariesManager libra
337337
startApp.getCancelEvent().printStackTrace(System.err);
338338
} else {
339339
System.err.println("Error while starting application:");
340-
startApp.getError().printStackTrace(System.err);
340+
startApp.getOutput().getError().printStackTrace(System.err);
341341
}
342342
return null;
343343
}
344-
return startApp.getResult();
344+
return startApp.getOutput().getResult();
345345
}
346346

347347
private static void start(final SplashScreen splash, final String[] args) {

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/async/AsyncSupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public <T2> AsyncSupplier<T2, TError> thenStart(
242242
executable.setInput(res);
243243
task.start();
244244
},
245-
task::setError,
245+
err -> task.setDone(null, err),
246246
task::cancel
247247
);
248248
return task.getOutput();
@@ -282,7 +282,7 @@ public IAsync<TError> thenStart(
282282
executable.setInput(res);
283283
task.start();
284284
},
285-
task::setError,
285+
err -> task.setDone(null, err),
286286
task::cancel
287287
);
288288
return task.getOutput();

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/tasks/drives/FileAccess.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void close() {
104104

105105
public long getPosition() throws IOException {
106106
if (!openTask.isDone()) return 0;
107-
if (!openTask.isSuccessful()) throw openTask.getError();
107+
if (!openTask.isSuccessful()) throw openTask.getOutput().getError();
108108
return channel.position();
109109
}
110110

@@ -118,7 +118,7 @@ public void getSize(final AsyncSupplier<Long,IOException> sp) {
118118
if (openTask.isSuccessful())
119119
sp.unblockSuccess(Long.valueOf(size));
120120
else
121-
sp.unblockError(openTask.getError());
121+
sp.unblockError(openTask.getOutput().getError());
122122
};
123123
openTask.getOutput().onDone(ready);
124124
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/tasks/drives/ReadFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public ReadFile(
4747
@Override
4848
public Integer execute() throws IOException, CancelException {
4949
if (!file.openTask.isSuccessful())
50-
throw file.openTask.getError();
50+
throw file.openTask.getOutput().getError();
5151
int nbRead = 0;
5252
if (pos >= 0)
5353
try { file.channel.position(pos); }

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/Task.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,6 @@ public String getDescription() {
151151
return description;
152152
}
153153

154-
public void setDescription(String descr) {
155-
description = descr;
156-
}
157-
158154
public Application getApplication() {
159155
return app;
160156
}
@@ -181,10 +177,6 @@ public Task<T, TError> setMaxBlockingTimeInNanoBeforeToLog(long nano) {
181177
// --- status, cancel, error, result ---
182178

183179

184-
public byte getStatus() {
185-
return status;
186-
}
187-
188180
public boolean isDone() {
189181
return status == STATUS_DONE && result.isDone();
190182
}
@@ -239,19 +231,7 @@ public void cancel(CancelException reason) {
239231
@Override
240232
public CancelException getCancelEvent() { return cancelling != null ? cancelling : result.getCancelEvent(); }
241233

242-
/** Set this task's error (MUST NOT be started). */
243-
public void setError(TError error) {
244-
status = STATUS_DONE;
245-
result.unblockError(error);
246-
}
247-
248-
public boolean hasError() { return result.hasError(); }
249-
250-
public TError getError() { return result.getError(); }
251-
252-
public T getResult() { return result.getResult(); }
253-
254-
public Output getOutput() { return result; }
234+
public AsyncSupplier<T, TError> getOutput() { return result; }
255235

256236
/** Set this task as done with the given result or error. */
257237
public void setDone(T result, TError error) {
@@ -478,8 +458,13 @@ void transferTo(TaskManager newManager) {
478458
}
479459

480460
void cancelledBecauseExecutorDied(CancelException reason) {
481-
cancelling = reason;
461+
if (cancelling != null)
462+
reason = cancelling;
463+
else
464+
cancelling = reason;
482465
result.cancel(reason);
466+
status = STATUS_DONE;
467+
result.cancelled(reason);
483468
}
484469

485470

@@ -571,13 +556,7 @@ public String toString() {
571556
}
572557

573558
/** Synchronization point holding the result or error of this task. */
574-
public final class Output extends AsyncSupplier<T, TError> {
575-
private Output() {}
576-
577-
public Task<T,TError> getTask() {
578-
return Task.this;
579-
}
580-
559+
private final class Output extends AsyncSupplier<T, TError> {
581560
@Override
582561
public void unblockCancel(CancelException reason) {
583562
Task.this.cancel(reason);
@@ -589,7 +568,7 @@ void cancelled(CancelException reason) {
589568

590569
@Override
591570
public String toString() {
592-
return "Task synchronization point [" + description + "]";
571+
return "Task result [" + description + "]";
593572
}
594573
}
595574

@@ -631,4 +610,32 @@ public static <T, TError extends Exception> Task<T, TError> file(
631610
return new Task<>(Threading.getDrivesManager().getTaskManager(file), description, priority, executable, null);
632611
}
633612

613+
/** Create a task using a pool of threads. */
614+
public static <T, TError extends Exception> Task<T, TError> unmanaged(
615+
String description, Priority priority, Executable<T, TError> executable, Consumer<Pair<T,TError>> ondone
616+
) {
617+
return new Task<>(Threading.getUnmanagedTaskManager(), description, priority, executable, ondone);
618+
}
619+
620+
/** Create a task using a pool of threads. */
621+
public static <T, TError extends Exception> Task<T, TError> unmanaged(
622+
String description, Executable<T, TError> executable, Consumer<Pair<T,TError>> ondone
623+
) {
624+
return new Task<>(Threading.getUnmanagedTaskManager(), description, null, executable, ondone);
625+
}
626+
627+
/** Create a task using a pool of threads. */
628+
public static <T, TError extends Exception> Task<T, TError> unmanaged(
629+
String description, Priority priority, Executable<T, TError> executable
630+
) {
631+
return new Task<>(Threading.getUnmanagedTaskManager(), description, priority, executable, null);
632+
}
633+
634+
/** Create a task using a pool of threads. */
635+
public static <T, TError extends Exception> Task<T, TError> unmanaged(
636+
String description, Executable<T, TError> executable
637+
) {
638+
return new Task<>(Threading.getUnmanagedTaskManager(), description, null, executable, null);
639+
}
640+
634641
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/TaskManager.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public TaskManager(
4646

4747
public final Object getResource() { return resource; }
4848

49-
public TaskManagerMonitor.Configuration getMonitorConfiguration() {
50-
return monitor.getConfiguration();
49+
public TaskManagerMonitor getMonitor() {
50+
return monitor;
5151
}
5252

5353
/** Start the threads of this task manager. */
@@ -251,7 +251,7 @@ void putExecutorAside(TaskExecutor executor) {
251251
executorAside(executor);
252252
}
253253

254-
@SuppressWarnings({"deprecation", "squid:CallToDeprecatedMethod"})
254+
@SuppressWarnings({"deprecation", "squid:CallToDeprecatedMethod", "java:S1181"})
255255
void killExecutor(TaskExecutor executor) {
256256
synchronized (aside) {
257257
if (!aside.remove(executor)) return;
@@ -261,9 +261,14 @@ void killExecutor(TaskExecutor executor) {
261261
s.append("Task stopped at \r\n");
262262
DebugUtil.createStackTrace(s, stack);
263263
Threading.getLogger().error(s.toString());
264-
executor.thread.stop();
265-
if (executor.getCurrentTask() != null)
266-
executor.getCurrentTask().cancel(new CancelException("Task was running since a too long time"));
264+
Task<?, ?> task = executor.getCurrentTask();
265+
if (task != null)
266+
task.cancel(new CancelException("Task was running since a too long time"));
267+
try {
268+
executor.thread.stop();
269+
} catch (Throwable e) {
270+
// ignore
271+
}
267272
}
268273

269274
protected abstract void executorUncaughtException(TaskExecutor executor);

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/TaskManagerMonitor.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,28 @@ public Configuration(
4242
TaskManagerMonitor(TaskManager manager, Configuration config) {
4343
this.manager = manager;
4444
this.config = config;
45-
Monitor monitor = new Monitor();
46-
Thread t = manager.threadFactory.newThread(monitor);
45+
thread = new Monitor();
46+
Thread t = manager.threadFactory.newThread(thread);
4747
t.setName(manager.getName() + " - Task Monitoring");
4848
t.start();
49-
LCCore.get().toClose(monitor);
49+
LCCore.get().toClose(thread);
5050
}
5151

5252
private TaskManager manager;
5353
private Configuration config;
54+
private Monitor thread;
5455

5556
public Configuration getConfiguration() {
5657
return config;
5758
}
5859

60+
/** Check tasks now. */
61+
public void checkNow() {
62+
synchronized (thread.lock) {
63+
thread.lock.notify();
64+
}
65+
}
66+
5967
private class Monitor implements Runnable, Closeable {
6068

6169
private Object lock = new Object();
@@ -117,8 +125,7 @@ private long check(TaskExecutor executor) {
117125
if (executor.aside) return -1;
118126
StringBuilder s = new StringBuilder(2048);
119127
startMessage(s, executor, ms);
120-
s.append("Task ").append(task).append(" is running since ").append(ms)
121-
.append(" ! put the thread aside and start a new thread, current stack:\r\n");
128+
s.append(" ! put the thread aside and start a new thread, current stack:");
122129
DebugUtil.createStackTrace(s, executor.thread.getStackTrace());
123130
appendLocks(s, executor.thread);
124131
Threading.getLogger().warn(s.toString());
@@ -127,7 +134,7 @@ private long check(TaskExecutor executor) {
127134
}
128135
StringBuilder s = new StringBuilder(2048);
129136
startMessage(s, executor, ms);
130-
s.append(" ! kill the thread! current stack:\r\n");
137+
s.append(" ! kill the thread! current stack:");
131138
DebugUtil.createStackTrace(s, executor.thread.getStackTrace());
132139
appendLocks(s, executor.thread);
133140
Threading.getLogger().error(s.toString());

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/IOInMemoryOrFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public AsyncSupplier<Integer,IOException> writeAsync(long pos, ByteBuffer buffer
267267
res -> {
268268
fil.set(file.writeAsync(0, buffer));
269269
IOUtil.listenOnDone(fil.get(), result -> {
270-
Integer r = Integer.valueOf(mem.getResult().intValue() + result.intValue());
270+
Integer r = Integer.valueOf(mem.getOutput().getResult().intValue() + result.intValue());
271271
if (ondone != null) ondone.accept(new Pair<>(r, null));
272272
sp.unblockSuccess(r);
273273
}, sp, ondone);

0 commit comments

Comments
 (0)