Skip to content

Commit c82145b

Browse files
committed
Avoid memory leakage via the thread pool
1 parent b9acabc commit c82145b

File tree

5 files changed

+26
-30
lines changed

5 files changed

+26
-30
lines changed

src/main/java/io/mapsmessaging/network/protocol/impl/satellite/TaskManager.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,36 @@
1919

2020
package io.mapsmessaging.network.protocol.impl.satellite;
2121

22+
import io.mapsmessaging.auth.AuthManager;
23+
2224
import java.util.concurrent.*;
2325
import java.util.concurrent.atomic.AtomicInteger;
2426

27+
28+
@SuppressWarnings("java:S6548") // yes it is a singleton
2529
public class TaskManager {
30+
31+
private static class Holder {
32+
static final TaskManager INSTANCE = new TaskManager();
33+
}
34+
public static TaskManager getInstance() {
35+
return Holder.INSTANCE;
36+
}
37+
2638
private static final AtomicInteger count = new AtomicInteger(1);
2739

2840
private final ExecutorService existingPool;
2941
private final ScheduledExecutorService scheduler;
3042

31-
public TaskManager() {
43+
private TaskManager() {
3244
this(4);
3345
}
3446

35-
public TaskManager(int poolSize) {
47+
private TaskManager(int poolSize) {
3648
existingPool= Executors.newFixedThreadPool(4, r -> new Thread(r, "Satellite-Executors-" + count.getAndIncrement()));
3749
scheduler = new ScheduledThreadPoolExecutor(poolSize, ((ThreadPoolExecutor) existingPool).getThreadFactory());
3850
}
3951

40-
public void close(){
41-
scheduler.shutdown();
42-
existingPool.shutdown();
43-
}
44-
4552
public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
4653
return scheduler.schedule(task, delay, unit);
4754
}

src/main/java/io/mapsmessaging/network/protocol/impl/satellite/gateway/io/GatewayManager.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public class GatewayManager {
5151
private final Map<String, RemoteDeviceInfo> knownTerminals;
5252
private final AtomicBoolean closed = new AtomicBoolean(false);
5353

54-
private TaskManager taskManager;
5554
private ScheduledFuture<?> scheduledFuture;
5655

5756

@@ -68,8 +67,7 @@ public GatewayManager(SatelliteConfigDTO satelliteConfigDTO, IncomingMessageHand
6867
}
6968

7069
public void start() {
71-
taskManager = new TaskManager();
72-
taskManager.schedule(this::initSession, 10, TimeUnit.SECONDS);
70+
TaskManager.getInstance().schedule(this::initSession, 10, TimeUnit.SECONDS);
7371
}
7472

7573
public void stop(){
@@ -78,7 +76,6 @@ public void stop(){
7876
scheduledFuture.cancel(true);
7977
scheduledFuture = null;
8078
}
81-
taskManager.close();
8279
}
8380

8481
protected void initSession(){
@@ -97,15 +94,15 @@ protected void initSession(){
9794
for (RemoteDeviceInfo terminal : knownTerminals.values()) {
9895
handler.registerTerminal(terminal);
9996
}
100-
scheduledFuture = taskManager.schedule(this::pollGateway, pollInterval, TimeUnit.SECONDS);
97+
scheduledFuture = TaskManager.getInstance().schedule(this::pollGateway, pollInterval, TimeUnit.SECONDS);
10198
} else {
10299
logger.log(OGWS_FAILED_AUTHENTICATION);
103100
}
104101
}
105102
catch (RuntimeException| IOException e) {
106103
// Todo log this
107104
// we can try again
108-
taskManager.schedule(this::initSession, 1, TimeUnit.MINUTES);
105+
TaskManager.getInstance().schedule(this::initSession, 1, TimeUnit.MINUTES);
109106
} catch (InterruptedException e) {
110107
Thread.currentThread().interrupt();
111108
} catch (LoginException e) {
@@ -150,7 +147,7 @@ protected void pollGateway() {
150147
}
151148
}
152149
finally {
153-
scheduledFuture = taskManager.schedule(this::pollGateway, pollInterval, TimeUnit.SECONDS);
150+
scheduledFuture = TaskManager.getInstance().schedule(this::pollGateway, pollInterval, TimeUnit.SECONDS);
154151
}
155152
}
156153

src/main/java/io/mapsmessaging/network/protocol/impl/satellite/gateway/protocol/SatelliteGatewayProtocol.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060

6161
public class SatelliteGatewayProtocol extends Protocol {
6262

63-
private final TaskManager taskManager;
6463
private final Logger logger = LoggerFactory.getLogger(SatelliteGatewayProtocol.class);
6564
private final SatelliteMessageRebuilder messageRebuilder;
6665
private final Session session;
@@ -88,7 +87,6 @@ public SatelliteGatewayProtocol(@NonNull @NotNull EndPoint endPoint, @NotNull @N
8887
pendingMessages.set(new LinkedHashMap<>());
8988
priorityMessages = new AtomicReference<>();
9089
priorityMessages.set(new LinkedHashMap<>());
91-
taskManager = new TaskManager();
9290
String primeId = ((SatelliteEndPoint) endPoint).getTerminalInfo().getUniqueId();
9391
SatelliteConfigDTO config = (SatelliteConfigDTO) protocolConfig;
9492
sendHighPriorityEvents = config.isSendHighPriorityMessages();
@@ -151,7 +149,7 @@ public SatelliteGatewayProtocol(@NonNull @NotNull EndPoint endPoint, @NotNull @N
151149
}
152150
((SatelliteEndPoint) endPoint).unmute();
153151
nextOutgoingTime = System.currentTimeMillis() + outgoingPollInterval;
154-
scheduledFuture = taskManager.schedule(this::processOutstandingMessages, 15, TimeUnit.SECONDS);
152+
scheduledFuture = TaskManager.getInstance().schedule(this::processOutstandingMessages, 15, TimeUnit.SECONDS);
155153
}
156154

157155
private String parsePath(String path, String defaultValue, String primeId, String mailboxId){
@@ -172,7 +170,6 @@ public void close() throws IOException {
172170
closed = true;
173171
SessionManager.getInstance().close(session, false);
174172
super.close();
175-
taskManager.close();
176173
}
177174
}
178175

@@ -321,7 +318,7 @@ private void processOutstandingMessages() {
321318
// Log this
322319
}
323320
} finally {
324-
scheduledFuture = taskManager.schedule(this::processOutstandingMessages, 15, TimeUnit.SECONDS);
321+
scheduledFuture = TaskManager.getInstance().schedule(this::processOutstandingMessages, 15, TimeUnit.SECONDS);
325322
}
326323
}
327324

src/main/java/io/mapsmessaging/network/protocol/impl/satellite/modem/device/Modem.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public class Modem {
6161
private final Queue<Command> commandQueue = new ArrayDeque<>();
6262
private final StringBuilder responseBuffer = new StringBuilder();
6363
private final long modemTimeout;
64-
private final TaskManager taskManager;
6564

6665
@Getter
6766
private final ModemStreamHandler streamHandler;
@@ -78,18 +77,17 @@ public class Modem {
7877
@Setter
7978
private boolean oneShotResponse;
8079

81-
public Modem(Consumer<Packet> packetSender, long modemTimeout, ModemStreamHandler streamHandler, TaskManager taskManager) {
80+
public Modem(Consumer<Packet> packetSender, long modemTimeout, ModemStreamHandler streamHandler) {
8281
this.streamHandler = streamHandler;
8382
this.packetSender = packetSender;
84-
this.taskManager = taskManager;
8583
oneShotResponse = false;
8684
currentHandler = new TextResponseHandler(this::handleLine);
8785
if (modemTimeout < 10000 || modemTimeout > 120000) {
8886
modemTimeout = 15000;
8987
}
9088
this.modemTimeout = modemTimeout;
9189

92-
future = taskManager.schedule(this::scanTimeouts, modemTimeout, TimeUnit.MILLISECONDS);
90+
future = TaskManager.getInstance().schedule(this::scanTimeouts, modemTimeout, TimeUnit.MILLISECONDS);
9391
}
9492

9593
public void close() {
@@ -111,7 +109,7 @@ private void scanTimeouts() {
111109
currentCommand.future.completeExceptionally(new IOException("Modem has been closed"));
112110
}
113111
} finally {
114-
future = taskManager.schedule(this::scanTimeouts, modemTimeout, TimeUnit.MILLISECONDS);
112+
future = TaskManager.getInstance().schedule(this::scanTimeouts, modemTimeout, TimeUnit.MILLISECONDS);
115113
}
116114
}
117115

src/main/java/io/mapsmessaging/network/protocol/impl/satellite/modem/protocol/StoGiProtocol.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ public class StoGiProtocol extends Protocol implements Consumer<Packet> {
7676

7777
private static final String STOGI = "stogi";
7878

79-
private final TaskManager taskManager;
8079
private final Logger logger = LoggerFactory.getLogger(StoGiProtocol.class);
8180
private final Session session;
8281
private final SelectorTask selectorTask;
@@ -112,7 +111,6 @@ public StoGiProtocol(EndPoint endPoint, Packet packet) throws LoginException, IO
112111
super(endPoint, endPoint.getConfig().getProtocolConfig(STOGI));
113112
satelliteOnline = false;
114113
satelliteOnlineCount = 0;
115-
taskManager = new TaskManager();
116114
satelliteMessageRebuilder = new SatelliteMessageRebuilder();
117115
messageId = 0;
118116
currentList = new ArrayList<>();
@@ -144,7 +142,7 @@ public StoGiProtocol(EndPoint endPoint, Packet packet) throws LoginException, IO
144142

145143
sinNumber = modemConfig.getSinNumber();
146144
messageLifeTime = modemConfig.getMessageLifeTimeInMinutes();
147-
modem = new Modem(this, modemConfig.getModemResponseTimeout(), streamHandler, taskManager);
145+
modem = new Modem(this, modemConfig.getModemResponseTimeout(), streamHandler);
148146

149147
if(!modemConfig.getSharedSecret().trim().isEmpty()) {
150148
cipherManager = new CipherManager(modemConfig.getSharedSecret().getBytes());
@@ -174,7 +172,7 @@ public StoGiProtocol(EndPoint endPoint, Packet packet) throws LoginException, IO
174172
}
175173
lastOutgoingMessagePollInterval = System.currentTimeMillis();
176174
statsManager = new StatsManager(modem, locationPollInterval, destination);
177-
scheduledFuture = taskManager.schedule(this::pollModemForMessages, messagePoll, TimeUnit.MILLISECONDS);
175+
scheduledFuture = TaskManager.getInstance().schedule(this::pollModemForMessages, messagePoll, TimeUnit.MILLISECONDS);
178176
logger.log(STOGI_STARTED_SESSION, modem.getModemProtocol().getType(), messagePoll,outgoingMessagePollInterval );
179177
}
180178

@@ -212,7 +210,7 @@ private void pollModemForMessages() {
212210
logger.log(STOGI_POLL_RAISED_EXCEPTION, th);
213211
} finally {
214212
poll = hasOutgoing?500:poll;
215-
scheduledFuture = taskManager.schedule(this::pollModemForMessages, poll, TimeUnit.MILLISECONDS);
213+
scheduledFuture = TaskManager.getInstance().schedule(this::pollModemForMessages, poll, TimeUnit.MILLISECONDS);
216214
startTime = System.currentTimeMillis() - startTime;
217215
logger.log(STOGI_POLL_FOR_ACTIONS, startTime, poll);
218216
}
@@ -228,7 +226,6 @@ public void close() throws IOException {
228226
modem.close();
229227
}
230228
super.close();
231-
taskManager.close();
232229
}
233230

234231
private Session setupSession() throws LoginException, IOException {

0 commit comments

Comments
 (0)