Skip to content

Commit 421658d

Browse files
committed
remove unused stats of TaskManager + few cleaning
1 parent fbb60e3 commit 421658d

File tree

10 files changed

+82
-98
lines changed

10 files changed

+82
-98
lines changed

README.md

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,17 @@ A thread pool is also available for long running tasks that cannot be split,
6969
or tasks using a library with functionalities that may use several physical resources.
7070
They will be executed in separate threads so they won't block other tasks.
7171

72+
A monitoring of tasks executed is done so if a task is exceeding a maximum time, it is put aside to run a new thread
73+
and continue executing other tasks. After a second maximum time, the thread is killed.
74+
7275
A task should not, but is allowed to block. In this case the blocked thread is interrupted and a new thread
7376
is automatically launched to process other tasks for the same physical resource. Once the task is unblocked,
7477
the thread is resumed as soon as another thread is available and can be stopped. For this, synchronized
7578
sections should be avoided as much as possible (or be very short), instead a _synchronization point_ should
7679
be used.
7780

78-
Different kinds of _synchronization point_ are available in the package net.lecousin.framework.concurrent.synch,
79-
such as JoinPoint, SynchronizationPoint, AsyncWork... They allow to wait for one or more asynchronous operations
81+
Different kinds of _synchronization point_ are available in the package net.lecousin.framework.concurrent.async,
82+
such as JoinPoint, Async, AsyncSupplier... They allow to wait for one or more asynchronous operations
8083
to finish (successfully or not), by listening to them.
8184

8285
By default, the order tasks are executed is based on tasks' priority,
@@ -86,6 +89,10 @@ This may be changed by providing a new implementation of TaskPriorityManager.
8689
The multi-threading system handles CPU and drives tasks, for network asynchronous operations you can
8790
use the library [net.lecousin.framework.network.core](https://github.com/lecousin/java-framework-network-core "java-framework-network-core").
8891

92+
For a better management of drives, you can use the library
93+
[net.lecousin.framework.system](https://github.com/lecousin/java-framework-system "java-framework-system")
94+
which will detect drives, their type and capabilities.
95+
8996
## IO Model
9097

9198
The model provided by Java is very basic and mainly based on streams (reading or writing forward).
@@ -100,7 +107,19 @@ and it can resize the IO can be defined as follow:
100107

101108
public <T extends IO.Writable.Seekable & IO.Resizable> myMethod(T io) { ... }
102109

103-
In addition, the model add asynchronous operations (non-blocking).
110+
In addition, the model add asynchronous operations (non-blocking) to improve multi-threading.
111+
112+
Various kind of I/O are available:
113+
- FileIO for files
114+
- LinkedIO to aggregate several IO into a single one
115+
- SubIO to extract a part as an IO
116+
- buffered implementation such as BufferedIO, PreBufferedReadable, ByteArrayIO, IOInMomoryOrFile...
117+
- OutputToInput allowing to see on one side a writable IO (a producer of data), and on antoher side
118+
a readable IO to consume the data
119+
- and more...
120+
121+
The package net.lecousin.framework.io.util provides also utilities such as AsyncProducer and AsyncConsumer,
122+
BroadcastIO, LimitWriteOperations to queue writing operations...
104123

105124
See the javadoc of package net.lecousin.framework.io for more information.
106125

@@ -132,3 +151,8 @@ memory becomes low, it will ask the implementations to free some memory.
132151

133152
In addition, when an application is idle (doing almost nothing) since several minutes, the MemoryManager may
134153
decide to ask to free some memory to reduce the memory footprint of the application when it is idle.
154+
155+
## Utilities
156+
157+
Various other utilities are also available, such as collections, encoding, XML parsing, more flexible
158+
strings...

net.lecousin.core/src/main/java/net/lecousin/framework/application/launcher/DynamicLibrariesManager.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,7 @@ private void loadApplicationLibrary(
302302
lib.descr = descr;
303303
libraries.put(descr.getGroupId() + ':' + descr.getArtifactId(), lib);
304304
appLib = lib;
305-
Task.cpu("Load library " + lib.descr.getGroupId() + ':' + lib.descr.getArtifactId(), Task.Priority.IMPORTANT,
306-
new LoadLibrary(lib, resolveConflicts.getOutput().getResult(), addPlugins, splash, stepLoad)).start();
305+
loadLibrary(lib, resolveConflicts.getOutput().getResult(), addPlugins, splash, stepLoad).start();
307306
lib.load.thenStart(Task.cpu("Finishing to initialize", Task.Priority.IMPORTANT, () -> {
308307
if (canStartApp.hasError()) return null;
309308
app.getDefaultLogger().debug("Libraries initialized.");
@@ -532,6 +531,14 @@ private Map<Version, List<Tree.Node<DependencyNode>>> getArtifactVersions(
532531
}
533532
}
534533

534+
private Task<Void, NoException> loadLibrary(
535+
Lib lib, Map<String, LibraryDescriptor> versions, List<LibraryDescriptor> addPlugins,
536+
WorkProgress progress, long work
537+
) {
538+
return Task.cpu("Load library " + lib.descr.getGroupId() + ':' + lib.descr.getArtifactId(), Task.Priority.IMPORTANT,
539+
new LoadLibrary(lib, versions, addPlugins, progress, work));
540+
}
541+
535542
private class LoadLibrary implements Executable<Void, NoException> {
536543
private LoadLibrary(
537544
Lib lib, Map<String, LibraryDescriptor> versions, List<LibraryDescriptor> addPlugins,
@@ -609,8 +616,7 @@ private void load(LibraryDescriptor d, String key, JoinPoint<LibraryManagementEx
609616
l.descr = d;
610617
libraries.put(key, l);
611618
}
612-
Task.cpu("Load library " + l.descr.getGroupId() + ':' + l.descr.getArtifactId(), Task.Priority.IMPORTANT,
613-
new LoadLibrary(l, versions, null, progress, work)).start();
619+
loadLibrary(l, versions, null, progress, work).start();
614620
jp.addToJoin(l.load);
615621
}
616622
}
@@ -803,10 +809,8 @@ public void run() {
803809
resolveConflicts.getOutput().onDone(() -> {
804810
app.getDefaultLogger().debug("Dependencies analyzed, loading and initializing libraries");
805811

806-
Task<Void, NoException> load = Task.cpu(
807-
"Load library " + l.descr.getGroupId() + ':' + l.descr.getArtifactId(),
808-
Task.Priority.IMPORTANT,
809-
new LoadLibrary(l, resolveConflicts.getOutput().getResult(), null, progress, work));
812+
Task<Void, NoException> load =
813+
loadLibrary(l, resolveConflicts.getOutput().getResult(), null, progress, work);
810814
load.start();
811815
l.load.onDone(result, () -> l.library);
812816
}, result);

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/TaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,5 @@ public void debug(StringBuilder s, String type) {
115115
DebugUtil.createStackTrace(s,thread.getStackTrace());
116116
}
117117
}
118-
118+
119119
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/TaskManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ public final List<TaskExecutor> getAllActiveExecutors() {
306306
public final void debug(StringBuilder s) {
307307
getDebugDescription(s);
308308
for (TaskExecutor w : getActiveExecutors())
309-
w.debug(s, "Active");
309+
try { w.debug(s, "Active"); }
310+
catch (Exception t) { /* ignore, because we don't want to do it in a synchronized block, so NPE can happen */ }
310311
s.append("\n - ").append(getInactiveExecutors().size()).append(" thread(s) inactive");
311312
for (TaskExecutor w : blocked)
312313
try { w.debug(s, "Blocked"); }
@@ -318,7 +319,4 @@ public final void debug(StringBuilder s) {
318319

319320
protected abstract void getDebugDescription(StringBuilder s);
320321

321-
/** Print statistics to the given StringBuilder. */
322-
public abstract void printStats(StringBuilder s);
323-
324322
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/Threading.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static Logger getLogger() {
5454
* @param nbUnmanagedThreads number of threads to use for unmanaged tasks.
5555
* If 0 or negative, maximum 100 threads will be used.
5656
*/
57+
@SuppressWarnings("java:S107") // number of parameters
5758
public static void init(
5859
ThreadFactory threadFactory,
5960
Class<? extends TaskPriorityManager> taskPriorityManagerClass,
@@ -301,12 +302,4 @@ public static String debug() {
301302
return s.toString();
302303
}
303304

304-
/** Print statistics. */
305-
public static String printStats() {
306-
StringBuilder s = new StringBuilder();
307-
for (TaskManager tm : resources.values())
308-
tm.printStats(s);
309-
return s.toString();
310-
}
311-
312305
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/fixed/FixedThreadTaskManager.java

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public FixedThreadTaskManager(
4141

4242
@Override
4343
protected void threadingStarted() {
44-
Task.cpu("Close old spare threads for " + getName(), Task.Priority.BACKGROUND, new CloseOldSpace())
44+
Task.cpu("Close old spare threads for " + getName(), Task.Priority.BACKGROUND, new CloseOldSpare())
4545
.executeEvery(60000, 6L * 60000).start();
4646
}
4747

@@ -98,12 +98,6 @@ final Task<?,?> peekNextOrWait() {
9898

9999
protected abstract TaskWorker[] getWorkers();
100100

101-
protected void addSpare(TaskWorker worker) {
102-
synchronized (spare) {
103-
spare.addLast(worker);
104-
}
105-
}
106-
107101
AsyncSupplier<TaskWorker,NoException> getPauseToDo() {
108102
if (pausesToDo.isEmpty()) return null;
109103
synchronized (pausesToDo) {
@@ -154,7 +148,7 @@ protected void unblockedExecutor(TaskExecutor executor) {
154148
if (pause.getResult() != null) { // can be null if we are stopping and just want to unblock this thread
155149
replaceWorkerBySpare(pause.getResult(), (TaskWorker)executor);
156150
synchronized (spare) {
157-
spare.addLast(pause.getResult());
151+
spare.addFirst(pause.getResult());
158152
}
159153
}
160154
}
@@ -183,39 +177,21 @@ protected void getDebugDescription(StringBuilder s) {
183177
s.append("Task Manager: ").append(getName()).append(" (").append(nbThreads).append(" threads)");
184178
}
185179

186-
@Override
187-
public void printStats(StringBuilder s) {
188-
try {
189-
s.append("Task Manager: ").append(getName()).append(" (").append(nbThreads).append(" threads):\r\n");
190-
for (TaskWorker w : getWorkers()) {
191-
s.append(" - Worker ");
192-
w.printStats(s);
193-
}
194-
for (TaskWorker w : spare) {
195-
s.append(" - Spare ");
196-
w.printStats(s);
197-
}
198-
for (TaskExecutor w : getBlockedExecutors()) {
199-
s.append(" - Blocked ");
200-
((TaskWorker)w).printStats(s);
201-
}
202-
} catch (Exception t) {
203-
/* ignore, because we don't want to do it in a synchronized block, so NPE can happen */
204-
}
205-
}
206-
207-
private class CloseOldSpace implements Executable<Void, NoException> {
180+
private class CloseOldSpare implements Executable<Void, NoException> {
208181
@Override
209182
public Void execute() {
210183
synchronized (spare) {
211184
if (spare.size() <= nbThreads) return null;
185+
int maxToStop = (spare.size() - nbThreads) / 3 + 1;
186+
int nbStop = 0;
212187
for (Iterator<TaskWorker> it = spare.iterator(); it.hasNext(); ) {
213188
TaskWorker t = it.next();
214189
if (t.lastUsed > 5 * 60000) {
215190
Threading.getLogger().info("Spare thread not used since more than 5 minutes => stop it");
216191
t.forceStop(true);
217192
spare.removeInstance(t);
218-
return null;
193+
if (++nbStop >= maxToStop)
194+
return null;
219195
}
220196
}
221197
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/fixed/TaskWorker.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@ protected void threadLoop() {
7878
if (isAside())
7979
break;
8080
}
81-
StringBuilder s = new StringBuilder();
82-
printStats(s);
83-
System.out.print(s.toString());
8481
}
8582

8683
@Override
@@ -89,20 +86,4 @@ protected void unblocked(long startBlock, long startWait, long endWait, long end
8986
workingTime -= endBlock - startBlock;
9087
waitingTime += endBlock - startBlock;
9188
}
92-
93-
public void printStats(StringBuilder s) {
94-
s.append(thread.getName());
95-
while (s.length() < 30) s.append(' ');
96-
s.append(": ");
97-
s.append(tasksDone);
98-
while (s.length() < 45) s.append(' ');
99-
s.append(" tasks done in ");
100-
s.append(((double)workingTime) / 1000000000);
101-
while (s.length() < 80) s.append(' ');
102-
s.append(" waited ");
103-
s.append(((double)waitingTime) / 1000000000);
104-
s.append(" blocked ");
105-
s.append(((double)blockedTime) / 1000000000);
106-
s.append("\r\n");
107-
}
10889
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/pool/ThreadPoolTaskManager.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public ThreadPoolTaskManager(
4141
long tasksTime = 0;
4242
private long threadCounter = 1;
4343

44+
private String newThreadName() {
45+
return "ThreadPool " + getName() + " - " + (threadCounter++);
46+
}
47+
4448
TaskPriorityManager getPriorityManager() {
4549
return taskPriorityManager;
4650
}
@@ -56,11 +60,8 @@ protected void threadingStarted() {
5660
}
5761

5862
@Override
59-
@SuppressWarnings("squid:S106") // print to console
6063
protected void finishAndStopActiveAndInactiveExecutors() {
61-
StringBuilder s = new StringBuilder();
62-
printStats(s);
63-
System.out.println(s.toString());
64+
// nothing to do
6465
}
6566

6667
@Override
@@ -83,7 +84,7 @@ public boolean allActiveExecutorsStopped() {
8384
protected void add(Task<?, ?> t) {
8485
synchronized (taskPriorityManager) {
8586
if (active.size() < maxThreads)
86-
active.add(new TaskWorker(t, this, "ThreadPool " + getName() + " - " + (threadCounter++)));
87+
active.add(new TaskWorker(t, this, newThreadName()));
8788
else
8889
taskPriorityManager.add(t);
8990
}
@@ -95,7 +96,7 @@ protected void executorUncaughtException(TaskExecutor executor) {
9596
active.remove(executor);
9697
Task<?,?> task = taskPriorityManager.peekNext();
9798
if (task != null)
98-
active.add(new TaskWorker(task, this, "ThreadPool " + getName() + " - " + (threadCounter++)));
99+
active.add(new TaskWorker(task, this, newThreadName()));
99100
}
100101
}
101102

@@ -105,7 +106,7 @@ protected void replaceBlockedExecutor(TaskExecutor executor) {
105106
active.remove(executor);
106107
Task<?,?> task = taskPriorityManager.peekNext();
107108
if (task != null)
108-
active.add(new TaskWorker(task, this, "ThreadPool " + getName() + " - " + (threadCounter++)));
109+
active.add(new TaskWorker(task, this, newThreadName()));
109110
}
110111
}
111112

@@ -122,7 +123,7 @@ protected void executorAside(TaskExecutor executor) {
122123
active.remove(executor);
123124
Task<?,?> task = taskPriorityManager.peekNext();
124125
if (task != null)
125-
active.add(new TaskWorker(task, this, "ThreadPool " + getName() + " - " + (threadCounter++)));
126+
active.add(new TaskWorker(task, this, newThreadName()));
126127
}
127128
}
128129

@@ -144,10 +145,4 @@ protected void getDebugDescription(StringBuilder s) {
144145
.append(active.size()).append('/').append(maxThreads).append(" active threads");
145146
}
146147

147-
@Override
148-
public void printStats(StringBuilder s) {
149-
s.append("Thread pool ").append(getName()).append(": ");
150-
s.append(tasksDone).append(" tasks done in ").append(((double)tasksTime) / 1000000000).append("s.");
151-
}
152-
153148
}

net.lecousin.core/src/main/java/net/lecousin/framework/text/CharArrayStringBuffer.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import net.lecousin.framework.concurrent.async.IAsync;
1616
import net.lecousin.framework.concurrent.threads.Task;
1717
import net.lecousin.framework.concurrent.threads.Task.Priority;
18-
import net.lecousin.framework.exception.NoException;
1918
import net.lecousin.framework.io.IO;
2019
import net.lecousin.framework.io.data.CharArray;
2120
import net.lecousin.framework.io.data.Chars;
@@ -318,16 +317,13 @@ public boolean readUntil(char endChar, IString string) throws IOException {
318317
@Override
319318
public AsyncSupplier<Boolean, IOException> readUntilAsync(char endChar, IString string) {
320319
AsyncSupplier<Boolean, IOException> result = new AsyncSupplier<>();
321-
Task.cpu("UnprotectedStringBuffer.readUntilAsync", getPriority(), new Executable<Void, NoException>() {
322-
@Override
323-
public Void execute() {
324-
try {
325-
result.unblockSuccess(Boolean.valueOf(readUntil(endChar, string)));
326-
} catch (IOException e) {
327-
result.error(e);
328-
}
329-
return null;
320+
Task.cpu("UnprotectedStringBuffer.readUntilAsync", getPriority(), () -> {
321+
try {
322+
result.unblockSuccess(Boolean.valueOf(readUntil(endChar, string)));
323+
} catch (IOException e) {
324+
result.error(e);
330325
}
326+
return null;
331327
}).start();
332328
return result;
333329
}

net.lecousin.core/src/test/java/net/lecousin/framework/core/tests/concurrent/threads/TestTaskManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package net.lecousin.framework.core.tests.concurrent.threads;
22

3+
import net.lecousin.framework.concurrent.async.Async;
34
import net.lecousin.framework.concurrent.threads.Task;
45
import net.lecousin.framework.concurrent.threads.TaskManagerMonitor.Configuration;
56
import net.lecousin.framework.concurrent.threads.Threading;
67
import net.lecousin.framework.core.test.LCCoreAbstractTest;
8+
import net.lecousin.framework.exception.NoException;
79

810
import org.junit.Assert;
911
import org.junit.Test;
@@ -31,4 +33,19 @@ public void testPutAsideAndKill() {
3133
Threading.setUnmanagedMonitorConfiguration(previousConfig);
3234
}
3335

36+
@Test
37+
public void testBlockInUnmanaged() throws Exception {
38+
Async<NoException> a = new Async<>();
39+
Async<NoException> b = new Async<>();
40+
Task<?, ?> task = Task.unmanaged("Test blocking task", () -> {
41+
b.unblock();
42+
a.block(0);
43+
return null;
44+
});
45+
task.start();
46+
b.blockThrow(0);
47+
Thread.sleep(500);
48+
a.unblock();
49+
}
50+
3451
}

0 commit comments

Comments
 (0)