Skip to content

Commit 4b49b2a

Browse files
committed
Avoid memory leakage via the thread pool
1 parent c82145b commit 4b49b2a

File tree

3 files changed

+198
-5
lines changed

3 files changed

+198
-5
lines changed

src/main/java/io/mapsmessaging/network/io/impl/serial/SerialEndPoint.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.mapsmessaging.logging.LoggerFactory;
2626
import io.mapsmessaging.network.admin.EndPointJMX;
2727
import io.mapsmessaging.network.io.*;
28+
import io.mapsmessaging.network.io.impl.serial.threads.SerialIoExecutors;
29+
import io.mapsmessaging.network.io.impl.serial.threads.SerialIoPoolHandle;
2830

2931
import java.io.IOException;
3032
import java.io.InputStream;
@@ -34,23 +36,30 @@
3436
import java.util.concurrent.ExecutorService;
3537
import java.util.concurrent.Executors;
3638
import java.util.concurrent.FutureTask;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3740
import java.util.concurrent.locks.LockSupport;
3841

3942
import static com.fazecast.jSerialComm.SerialPort.TIMEOUT_READ_BLOCKING;
4043

4144
public class SerialEndPoint extends EndPoint implements StreamEndPoint {
4245

43-
private final ExecutorService readExecutor = Executors.newFixedThreadPool(1);
44-
private final ExecutorService writeExecutor = Executors.newFixedThreadPool(1);
46+
private final ExecutorService readExecutor;
47+
private final ExecutorService writeExecutor;
4548

4649
private final SerialPort serialPort;
4750
private final OutputStream outputStream;
4851
private final InputStream inputStream;
4952
private final EndPointJMX mbean;
5053
private StreamHandler streamHandler;
54+
private final AtomicBoolean closed;
5155

5256
public SerialEndPoint(long id, EndPointServerStatus server, SerialPort serialPort, SerialConfigDTO config, List<String> jmxPath) {
5357
super(id, server);
58+
closed = new AtomicBoolean(false);
59+
SerialIoPoolHandle pool = SerialIoExecutors.getInstance().acquire(serialPort.getSystemPortName());
60+
readExecutor = pool.getReadExecutor();
61+
writeExecutor = pool.getWriteExecutor();
62+
5463
this.serialPort = serialPort;
5564
name = serialPort.getSystemPortName();
5665
configure(serialPort, config);
@@ -71,6 +80,7 @@ public static void configure(SerialPort serialPort, SerialConfigDTO config) {
7180

7281
@Override
7382
public void close() throws IOException {
83+
closed.set(true);
7484
super.close();
7585
mbean.close();
7686
serialPort.closePort();
@@ -164,12 +174,25 @@ public SerialReader(Selectable selectable) {
164174
}
165175

166176
public void run() {
167-
while (serialPort.bytesAvailable() == 0) {
168-
LockSupport.parkNanos(1000000);
177+
long delayNanos = 1_000_000L; // 1 ms
178+
long maxDelayNanos = 100_000_000L; // 100 ms
179+
180+
while (!closed.get()) {
181+
int available = serialPort.bytesAvailable();
182+
if (available > 0) {
183+
runner.selected(runner, null, SelectionKey.OP_READ);
184+
return;
185+
}
186+
187+
LockSupport.parkNanos(delayNanos);
188+
189+
if (delayNanos < maxDelayNanos) {
190+
delayNanos = Math.min(maxDelayNanos, delayNanos * 2);
191+
}
169192
}
170-
runner.selected(runner, null, SelectionKey.OP_READ);
171193
}
172194
}
195+
173196
//</editor-fold>
174197

175198
//<editor-fold desc="Serial Write Thread task">
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
*
3+
* Copyright [ 2020 - 2024 ] Matthew Buckton
4+
* Copyright [ 2024 - 2025 ] MapsMessaging B.V.
5+
*
6+
* Licensed under the Apache License, Version 2.0 with the Commons Clause
7+
* (the "License"); you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at:
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* https://commonsclause.com/
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.mapsmessaging.network.io.impl.serial.threads;
21+
22+
23+
import java.util.Objects;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.ConcurrentMap;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.ThreadFactory;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
32+
public final class SerialIoExecutors implements AutoCloseable {
33+
34+
private static final int SHUTDOWN_WAIT_SECONDS = 2;
35+
36+
private static final SerialIoExecutors INSTANCE = new SerialIoExecutors();
37+
38+
private final ConcurrentMap<String, SerialIoPoolHandle> poolsByPortName;
39+
40+
private SerialIoExecutors() {
41+
poolsByPortName = new ConcurrentHashMap<>();
42+
}
43+
44+
public static SerialIoExecutors getInstance() {
45+
return INSTANCE;
46+
}
47+
48+
public SerialIoPoolHandle acquire(String portName) {
49+
String canonicalPortName = canonicalizePortName(portName);
50+
51+
return poolsByPortName.computeIfAbsent(canonicalPortName, key -> {
52+
ThreadFactory readThreadFactory = new SerialThreadFactory("SerialRead", key);
53+
ThreadFactory writeThreadFactory = new SerialThreadFactory("SerialWrite", key);
54+
55+
ExecutorService readExecutor = Executors.newSingleThreadExecutor(readThreadFactory);
56+
ExecutorService writeExecutor = Executors.newSingleThreadExecutor(writeThreadFactory);
57+
58+
return new SerialIoPoolHandle(key, readExecutor, writeExecutor);
59+
});
60+
}
61+
62+
/**
63+
* In this simplified design, close is a no-op for individual handles.
64+
* We keep pools around for the lifetime of the process.
65+
*/
66+
public void release(String portName) {
67+
// Intentionally empty by design.
68+
}
69+
70+
@Override
71+
public void close() {
72+
for (SerialIoPoolHandle handle : poolsByPortName.values()) {
73+
shutdownExecutor(handle.getReadExecutor());
74+
shutdownExecutor(handle.getWriteExecutor());
75+
}
76+
poolsByPortName.clear();
77+
}
78+
79+
private void shutdownExecutor(ExecutorService executorService) {
80+
executorService.shutdown();
81+
try {
82+
boolean terminated = executorService.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
83+
if (!terminated) {
84+
executorService.shutdownNow();
85+
}
86+
} catch (InterruptedException interruptedException) {
87+
executorService.shutdownNow();
88+
Thread.currentThread().interrupt();
89+
}
90+
}
91+
92+
private String canonicalizePortName(String portName) {
93+
Objects.requireNonNull(portName, "portName must not be null");
94+
String trimmed = portName.trim();
95+
if (trimmed.isEmpty()) {
96+
throw new IllegalArgumentException("portName must not be empty");
97+
}
98+
return trimmed;
99+
}
100+
101+
private static final class SerialThreadFactory implements ThreadFactory {
102+
103+
private final String prefix;
104+
private final String portName;
105+
private final AtomicInteger threadNumber;
106+
107+
private SerialThreadFactory(String prefix, String portName) {
108+
this.prefix = prefix;
109+
this.portName = portName;
110+
this.threadNumber = new AtomicInteger(1);
111+
}
112+
113+
@Override
114+
public Thread newThread(Runnable runnable) {
115+
String name = prefix + "-" + portName + "-" + threadNumber.getAndIncrement();
116+
Thread thread = new Thread(runnable, name);
117+
thread.setDaemon(true);
118+
return thread;
119+
}
120+
}
121+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
*
3+
* Copyright [ 2020 - 2024 ] Matthew Buckton
4+
* Copyright [ 2024 - 2025 ] MapsMessaging B.V.
5+
*
6+
* Licensed under the Apache License, Version 2.0 with the Commons Clause
7+
* (the "License"); you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at:
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* https://commonsclause.com/
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package io.mapsmessaging.network.io.impl.serial.threads;
21+
22+
23+
24+
import java.util.concurrent.ExecutorService;
25+
26+
public final class SerialIoPoolHandle {
27+
28+
private final String portName;
29+
private final ExecutorService readExecutor;
30+
private final ExecutorService writeExecutor;
31+
32+
public SerialIoPoolHandle(String portName, ExecutorService readExecutor, ExecutorService writeExecutor) {
33+
this.portName = portName;
34+
this.readExecutor = readExecutor;
35+
this.writeExecutor = writeExecutor;
36+
}
37+
38+
public String getPortName() {
39+
return portName;
40+
}
41+
42+
public ExecutorService getReadExecutor() {
43+
return readExecutor;
44+
}
45+
46+
public ExecutorService getWriteExecutor() {
47+
return writeExecutor;
48+
}
49+
}

0 commit comments

Comments
 (0)