Skip to content

Commit b327fc0

Browse files
committed
prevent thread leak
1 parent 9628937 commit b327fc0

File tree

2 files changed

+39
-31
lines changed

2 files changed

+39
-31
lines changed

hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/PollingTask.java

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
import com.hivemq.adapter.sdk.api.events.model.Event;
2020
import com.hivemq.configuration.service.InternalConfigurations;
2121
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingSampler;
22-
import org.jetbrains.annotations.NotNull;
2322
import com.hivemq.util.ExceptionUtils;
2423
import com.hivemq.util.NanoTimeProvider;
24+
import org.jetbrains.annotations.NotNull;
2525
import org.jetbrains.annotations.VisibleForTesting;
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.RejectedExecutionException;
3131
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.ScheduledFuture;
3233
import java.util.concurrent.ThreadLocalRandom;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.atomic.AtomicBoolean;
3536
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.concurrent.atomic.AtomicReference;
3638

3739
public class PollingTask implements Runnable {
3840

@@ -42,12 +44,11 @@ public class PollingTask implements Runnable {
4244
private final @NotNull ScheduledExecutorService scheduledExecutorService;
4345
private final @NotNull EventService eventService;
4446
private final @NotNull NanoTimeProvider nanoTimeProvider;
45-
private final @NotNull AtomicInteger watchdogErrorCount = new AtomicInteger();
46-
private final @NotNull AtomicInteger applicationErrorCount = new AtomicInteger();
47-
47+
private final @NotNull AtomicInteger watchdogErrorCount;
48+
private final @NotNull AtomicInteger applicationErrorCount;
49+
private final @NotNull AtomicBoolean continueScheduling;
50+
private final @NotNull AtomicReference<ScheduledFuture<?>> currentScheduledFuture;
4851
private volatile long nanosOfLastPolling;
49-
private final @NotNull AtomicBoolean continueScheduling = new AtomicBoolean(true);
50-
5152

5253
public PollingTask(
5354
final @NotNull ProtocolAdapterPollingSampler sampler,
@@ -58,6 +59,19 @@ public PollingTask(
5859
this.scheduledExecutorService = scheduledExecutorService;
5960
this.eventService = eventService;
6061
this.nanoTimeProvider = nanoTimeProvider;
62+
this.watchdogErrorCount = new AtomicInteger();
63+
this.applicationErrorCount = new AtomicInteger();
64+
this.continueScheduling = new AtomicBoolean(true);
65+
this.currentScheduledFuture = new AtomicReference<>();
66+
}
67+
68+
private static long getBackoff(final int errorCount) {
69+
//-- This will backoff up to a max of about a day (unless the max provided is less)
70+
final long max = InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get();
71+
long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100);
72+
f += ThreadLocalRandom.current().nextInt(0, errorCount * 100);
73+
f = Math.min(f, max);
74+
return f;
6175
}
6276

6377
@Override
@@ -90,6 +104,11 @@ public void run() {
90104

91105
public void stopScheduling() {
92106
continueScheduling.set(false);
107+
// Cancel any currently scheduled future to prevent thread leaks
108+
final ScheduledFuture<?> future = currentScheduledFuture.getAndSet(null);
109+
if (future != null) {
110+
future.cancel(true);
111+
}
93112
}
94113

95114
private void handleInterruptionException(final @NotNull Throwable throwable) {
@@ -99,7 +118,8 @@ private void handleInterruptionException(final @NotNull Throwable throwable) {
99118
final var errorCountTotal = watchdogErrorCount.incrementAndGet();
100119
final var stopBecauseOfTooManyErrors =
101120
errorCountTotal > InternalConfigurations.ADAPTER_RUNTIME_WATCHDOG_TIMEOUT_ERRORS_BEFORE_INTERRUPT.get();
102-
final var milliSecondsSinceLastPoll = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
121+
final var milliSecondsSinceLastPoll =
122+
TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
103123
if (stopBecauseOfTooManyErrors) {
104124
log.warn(
105125
"Detected bad system process {} in sampler {} - terminating process to maintain health ({}ms runtime)",
@@ -121,7 +141,6 @@ private void handleInterruptionException(final @NotNull Throwable throwable) {
121141
}
122142
}
123143

124-
125144
private void handleExceptionDuringPolling(final @NotNull Throwable throwable) {
126145
final int errorCountTotal = applicationErrorCount.incrementAndGet();
127146
final int maxErrorsBeforeRemoval = sampler.getMaxErrorsBeforeRemoval();
@@ -145,11 +164,12 @@ private void handleExceptionDuringPolling(final @NotNull Throwable throwable) {
145164
notifyOnError(sampler, throwable, false);
146165
// no rescheduling
147166
}
148-
149167
}
150168

151169
private void notifyOnError(
152-
final @NotNull ProtocolAdapterPollingSampler sampler, final @NotNull Throwable t, final boolean continuing) {
170+
final @NotNull ProtocolAdapterPollingSampler sampler,
171+
final @NotNull Throwable t,
172+
final boolean continuing) {
153173
try {
154174
sampler.error(t, continuing);
155175
} catch (final Throwable samplerError) {
@@ -178,12 +198,10 @@ private void reschedule(final int errorCountTotal) {
178198
}
179199

180200
final long nonNegativeDelay = Math.max(0, delayInMillis);
181-
182201
if (errorCountTotal == 0) {
183202
schedule(nonNegativeDelay);
184203
} else {
185-
final long backoff = getBackoff(errorCountTotal,
186-
InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get());
204+
final long backoff = getBackoff(errorCountTotal);
187205
final long effectiveDelay = Math.max(nonNegativeDelay, backoff);
188206
schedule(effectiveDelay);
189207
}
@@ -193,7 +211,9 @@ private void reschedule(final int errorCountTotal) {
193211
void schedule(final long nonNegativeDelay) {
194212
if (continueScheduling.get()) {
195213
try {
196-
scheduledExecutorService.schedule(this, nonNegativeDelay, TimeUnit.MILLISECONDS);
214+
currentScheduledFuture.set(scheduledExecutorService.schedule(this,
215+
nonNegativeDelay,
216+
TimeUnit.MILLISECONDS));
197217
} catch (final RejectedExecutionException rejectedExecutionException) {
198218
// ignore. This is fine during shutdown.
199219
}
@@ -204,12 +224,4 @@ private void resetErrorStats() {
204224
applicationErrorCount.set(0);
205225
watchdogErrorCount.set(0);
206226
}
207-
208-
private static long getBackoff(final int errorCount, final long max) {
209-
//-- This will backoff up to a max of about a day (unless the max provided is less)
210-
long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100);
211-
f += ThreadLocalRandom.current().nextInt(0, errorCount * 100);
212-
f = Math.min(f, max);
213-
return f;
214-
}
215227
}

hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/ProtocolAdapterPollingServiceImpl.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,18 @@
2121
import com.hivemq.common.shutdown.ShutdownHooks;
2222
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingSampler;
2323
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingService;
24-
import org.jetbrains.annotations.NotNull;
2524
import com.hivemq.util.NanoTimeProvider;
25+
import jakarta.inject.Inject;
26+
import jakarta.inject.Singleton;
27+
import org.jetbrains.annotations.NotNull;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

29-
import jakarta.inject.Inject;
30-
import jakarta.inject.Singleton;
3131
import java.util.Map;
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ScheduledExecutorService;
3434
import java.util.concurrent.TimeUnit;
3535

36-
/**
37-
* @author Daniel Krüger
38-
*/
3936
@Singleton
4037
public class ProtocolAdapterPollingServiceImpl implements ProtocolAdapterPollingService {
4138

@@ -60,7 +57,8 @@ public ProtocolAdapterPollingServiceImpl(
6057

6158
@Override
6259
public void schedulePolling(final @NotNull ProtocolAdapterPollingSampler sampler) {
63-
final PollingTask pollingTask = new PollingTask(sampler, scheduledExecutorService, eventService, nanoTimeProvider);
60+
final PollingTask pollingTask =
61+
new PollingTask(sampler, scheduledExecutorService, eventService, nanoTimeProvider);
6462
scheduledExecutorService.schedule(pollingTask, sampler.getInitialDelay(), sampler.getUnit());
6563
samplerToTask.put(sampler, pollingTask);
6664
}
@@ -82,7 +80,6 @@ public void stopAllPolling() {
8280
samplerToTask.keySet().forEach(this::stopPolling);
8381
}
8482

85-
8683
private class Shutdown implements HiveMQShutdownHook {
8784
@Override
8885
public @NotNull String name() {
@@ -104,5 +101,4 @@ public void run() {
104101
}
105102
}
106103
}
107-
108104
}

0 commit comments

Comments
 (0)