Skip to content

Commit cdd700e

Browse files
committed
CHECKPOINT MIGUEL
PollingTask (close scheduled task on cancel) PAStateImpl (last message is now atomicly written) Southbound writing now returns a future to wait on Milo cleanup
1 parent 75bf0d8 commit cdd700e

File tree

3 files changed

+75
-63
lines changed

3 files changed

+75
-63
lines changed

hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/ProtocolAdapterStateImpl.java

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,42 @@
2020
import com.hivemq.adapter.sdk.api.events.model.Payload;
2121
import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState;
2222
import com.hivemq.edge.modules.api.events.model.EventImpl;
23+
import org.apache.commons.lang3.exception.ExceptionUtils;
2324
import org.jetbrains.annotations.NotNull;
2425
import org.jetbrains.annotations.Nullable;
25-
import org.apache.commons.lang3.exception.ExceptionUtils;
2626

2727
import java.util.concurrent.atomic.AtomicReference;
2828
import java.util.function.Consumer;
2929

3030
public class ProtocolAdapterStateImpl implements ProtocolAdapterState {
31+
private final @NotNull AtomicReference<RuntimeStatus> runtimeStatus;
32+
private final @NotNull AtomicReference<ConnectionStatus> connectionStatus;
33+
private final @NotNull AtomicReference<@Nullable String> lastErrorMessage;
3134
private final @NotNull EventService eventService;
3235
private final @NotNull String adapterId;
3336
private final @NotNull String protocolId;
34-
protected @NotNull AtomicReference<RuntimeStatus> runtimeStatus = new AtomicReference<>(RuntimeStatus.STOPPED);
35-
protected @NotNull AtomicReference<ConnectionStatus> connectionStatus =
36-
new AtomicReference<>(ConnectionStatus.DISCONNECTED);
37-
protected @Nullable String lastErrorMessage;
38-
private final AtomicReference<Consumer<ConnectionStatus>> connectionStatusListener = new AtomicReference<>();
37+
private final @NotNull AtomicReference<Consumer<ConnectionStatus>> connectionStatusListener;
3938

40-
public ProtocolAdapterStateImpl(final @NotNull EventService eventService,
41-
final @NotNull String adapterId,
42-
final @NotNull String protocolId) {
39+
public ProtocolAdapterStateImpl(
40+
final @NotNull EventService eventService,
41+
final @NotNull String adapterId,
42+
final @NotNull String protocolId) {
4343
this.eventService = eventService;
4444
this.adapterId = adapterId;
4545
this.protocolId = protocolId;
46+
this.runtimeStatus = new AtomicReference<>(RuntimeStatus.STOPPED);
47+
this.connectionStatus = new AtomicReference<>(ConnectionStatus.DISCONNECTED);
48+
this.lastErrorMessage = new AtomicReference<>(null);
49+
this.connectionStatusListener = new AtomicReference<>();
4650
}
4751

4852
@Override
4953
public boolean setConnectionStatus(final @NotNull ConnectionStatus connectionStatus) {
5054
Preconditions.checkNotNull(connectionStatus);
5155
final var changed = this.connectionStatus.getAndSet(connectionStatus) != connectionStatus;
52-
if(changed) {
56+
if (changed) {
5357
final var listener = connectionStatusListener.get();
54-
if(listener != null) {
58+
if (listener != null) {
5559
listener.accept(connectionStatus);
5660
}
5761
}
@@ -68,52 +72,50 @@ public boolean setConnectionStatus(final @NotNull ConnectionStatus connectionSta
6872
* and the errorMessage to that supplied.
6973
*/
7074
@Override
71-
public void setErrorConnectionStatus(
72-
final @Nullable Throwable t,
73-
final @Nullable String errorMessage) {
74-
final boolean changed = setConnectionStatus(ConnectionStatus.ERROR);
75-
reportErrorMessage( t, errorMessage, changed);
75+
public void setErrorConnectionStatus(final @Nullable Throwable t, final @Nullable String errorMessage) {
76+
reportErrorMessage(t, errorMessage, setConnectionStatus(ConnectionStatus.ERROR));
7677
}
7778

78-
/**
79-
* Sets the last error message associated with the adapter runtime. This is can be sent through the API to
80-
* give an indication of the status of an adapter runtime.
81-
*
82-
* @param errorMessage
83-
*/
8479
@Override
8580
public void reportErrorMessage(
8681
final @Nullable Throwable throwable,
8782
final @Nullable String errorMessage,
8883
final boolean sendEvent) {
89-
this.lastErrorMessage = errorMessage == null ? throwable == null ? null : throwable.getMessage() : errorMessage;
84+
// Sets the last error message associated with the adapter runtime.
85+
// This is can be sent through the API to give an indication of the
86+
// status of an adapter runtime.
87+
lastErrorMessage.set(errorMessage == null ? throwable == null ? null : throwable.getMessage() : errorMessage);
9088
if (sendEvent) {
91-
eventService.createAdapterEvent(adapterId, protocolId)
89+
final var eventBuilder = eventService.createAdapterEvent(adapterId, protocolId)
9290
.withSeverity(EventImpl.SEVERITY.ERROR)
93-
.withMessage(String.format("Adapter '%s' encountered an error.", adapterId))
94-
.withPayload(Payload.ContentType.PLAIN_TEXT, ExceptionUtils.getStackTrace(throwable))
95-
.fire();
91+
.withMessage(String.format("Adapter '%s' encountered an error.", adapterId));
92+
if (throwable != null) {
93+
eventBuilder.withPayload(Payload.ContentType.PLAIN_TEXT, ExceptionUtils.getStackTrace(throwable));
94+
} else if (errorMessage != null) {
95+
eventBuilder.withPayload(Payload.ContentType.PLAIN_TEXT, errorMessage);
96+
}
97+
eventBuilder.fire();
9698
}
9799
}
98100

99101
@Override
100-
public void setRuntimeStatus(final @NotNull RuntimeStatus runtimeStatus) {
101-
this.runtimeStatus.set(runtimeStatus);
102+
public @NotNull RuntimeStatus getRuntimeStatus() {
103+
return runtimeStatus.get();
102104
}
103105

104106
@Override
105-
public @NotNull RuntimeStatus getRuntimeStatus() {
106-
return this.runtimeStatus.get();
107+
public void setRuntimeStatus(final @NotNull RuntimeStatus status) {
108+
runtimeStatus.set(status);
107109
}
108110

109111
@Override
110112
public @Nullable String getLastErrorMessage() {
111-
return lastErrorMessage;
113+
return lastErrorMessage.get();
112114
}
113115

114-
public void setConnectionStatusListener(final @NotNull Consumer<ConnectionStatus> connectionStatusListener) {
115-
this.connectionStatusListener.set(connectionStatusListener);
116-
connectionStatusListener.accept(connectionStatus.get());
116+
public void setConnectionStatusListener(final @NotNull Consumer<ConnectionStatus> listener) {
117+
final ConnectionStatus currentStatus = connectionStatus.get();
118+
connectionStatusListener.set(listener);
119+
listener.accept(currentStatus);
117120
}
118-
119121
}

hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/PollingTask.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
import com.hivemq.adapter.sdk.api.events.model.Event;
2020
import com.hivemq.configuration.service.InternalConfigurations;
2121
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingSampler;
22-
import org.jetbrains.annotations.NotNull;
2322
import com.hivemq.util.ExceptionUtils;
2423
import com.hivemq.util.NanoTimeProvider;
24+
import org.jetbrains.annotations.NotNull;
2525
import org.jetbrains.annotations.VisibleForTesting;
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.RejectedExecutionException;
3131
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.ScheduledFuture;
3233
import java.util.concurrent.ThreadLocalRandom;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.atomic.AtomicBoolean;
3536
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.concurrent.atomic.AtomicReference;
3638

3739
public class PollingTask implements Runnable {
3840

@@ -42,12 +44,11 @@ public class PollingTask implements Runnable {
4244
private final @NotNull ScheduledExecutorService scheduledExecutorService;
4345
private final @NotNull EventService eventService;
4446
private final @NotNull NanoTimeProvider nanoTimeProvider;
45-
private final @NotNull AtomicInteger watchdogErrorCount = new AtomicInteger();
46-
private final @NotNull AtomicInteger applicationErrorCount = new AtomicInteger();
47-
47+
private final @NotNull AtomicInteger watchdogErrorCount;
48+
private final @NotNull AtomicInteger applicationErrorCount;
49+
private final @NotNull AtomicBoolean continueScheduling;
50+
private final @NotNull AtomicReference<ScheduledFuture<?>> currentScheduledFuture;
4851
private volatile long nanosOfLastPolling;
49-
private final @NotNull AtomicBoolean continueScheduling = new AtomicBoolean(true);
50-
5152

5253
public PollingTask(
5354
final @NotNull ProtocolAdapterPollingSampler sampler,
@@ -58,6 +59,19 @@ public PollingTask(
5859
this.scheduledExecutorService = scheduledExecutorService;
5960
this.eventService = eventService;
6061
this.nanoTimeProvider = nanoTimeProvider;
62+
this.watchdogErrorCount = new AtomicInteger();
63+
this.applicationErrorCount = new AtomicInteger();
64+
this.continueScheduling = new AtomicBoolean(true);
65+
this.currentScheduledFuture = new AtomicReference<>();
66+
}
67+
68+
private static long getBackoff(final int errorCount) {
69+
//-- This will backoff up to a max of about a day (unless the max provided is less)
70+
final long max = InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get();
71+
long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100);
72+
f += ThreadLocalRandom.current().nextInt(0, errorCount * 100);
73+
f = Math.min(f, max);
74+
return f;
6175
}
6276

6377
@Override
@@ -90,6 +104,10 @@ public void run() {
90104

91105
public void stopScheduling() {
92106
continueScheduling.set(false);
107+
final ScheduledFuture<?> future = currentScheduledFuture.getAndSet(null);
108+
if (future != null) {
109+
future.cancel(true);
110+
}
93111
}
94112

95113
private void handleInterruptionException(final @NotNull Throwable throwable) {
@@ -99,7 +117,8 @@ private void handleInterruptionException(final @NotNull Throwable throwable) {
99117
final var errorCountTotal = watchdogErrorCount.incrementAndGet();
100118
final var stopBecauseOfTooManyErrors =
101119
errorCountTotal > InternalConfigurations.ADAPTER_RUNTIME_WATCHDOG_TIMEOUT_ERRORS_BEFORE_INTERRUPT.get();
102-
final var milliSecondsSinceLastPoll = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
120+
final var milliSecondsSinceLastPoll =
121+
TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling);
103122
if (stopBecauseOfTooManyErrors) {
104123
log.warn(
105124
"Detected bad system process {} in sampler {} - terminating process to maintain health ({}ms runtime)",
@@ -121,7 +140,6 @@ private void handleInterruptionException(final @NotNull Throwable throwable) {
121140
}
122141
}
123142

124-
125143
private void handleExceptionDuringPolling(final @NotNull Throwable throwable) {
126144
final int errorCountTotal = applicationErrorCount.incrementAndGet();
127145
final int maxErrorsBeforeRemoval = sampler.getMaxErrorsBeforeRemoval();
@@ -145,11 +163,12 @@ private void handleExceptionDuringPolling(final @NotNull Throwable throwable) {
145163
notifyOnError(sampler, throwable, false);
146164
// no rescheduling
147165
}
148-
149166
}
150167

151168
private void notifyOnError(
152-
final @NotNull ProtocolAdapterPollingSampler sampler, final @NotNull Throwable t, final boolean continuing) {
169+
final @NotNull ProtocolAdapterPollingSampler sampler,
170+
final @NotNull Throwable t,
171+
final boolean continuing) {
153172
try {
154173
sampler.error(t, continuing);
155174
} catch (final Throwable samplerError) {
@@ -178,12 +197,10 @@ private void reschedule(final int errorCountTotal) {
178197
}
179198

180199
final long nonNegativeDelay = Math.max(0, delayInMillis);
181-
182200
if (errorCountTotal == 0) {
183201
schedule(nonNegativeDelay);
184202
} else {
185-
final long backoff = getBackoff(errorCountTotal,
186-
InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get());
203+
final long backoff = getBackoff(errorCountTotal);
187204
final long effectiveDelay = Math.max(nonNegativeDelay, backoff);
188205
schedule(effectiveDelay);
189206
}
@@ -193,8 +210,10 @@ private void reschedule(final int errorCountTotal) {
193210
void schedule(final long nonNegativeDelay) {
194211
if (continueScheduling.get()) {
195212
try {
196-
scheduledExecutorService.schedule(this, nonNegativeDelay, TimeUnit.MILLISECONDS);
197-
} catch (final RejectedExecutionException rejectedExecutionException) {
213+
currentScheduledFuture.set(scheduledExecutorService.schedule(this,
214+
nonNegativeDelay,
215+
TimeUnit.MILLISECONDS));
216+
} catch (final RejectedExecutionException ignored) {
198217
// ignore. This is fine during shutdown.
199218
}
200219
}
@@ -204,12 +223,4 @@ private void resetErrorStats() {
204223
applicationErrorCount.set(0);
205224
watchdogErrorCount.set(0);
206225
}
207-
208-
private static long getBackoff(final int errorCount, final long max) {
209-
//-- This will backoff up to a max of about a day (unless the max provided is less)
210-
long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100);
211-
f += ThreadLocalRandom.current().nextInt(0, errorCount * 100);
212-
f = Math.min(f, max);
213-
return f;
214-
}
215226
}

hivemq-edge/src/main/java/com/hivemq/protocols/InternalProtocolAdapterWritingService.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService;
1919
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterWritingService;
20-
import com.hivemq.adapter.sdk.api.writing.WritingContext;
2120
import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter;
22-
import com.hivemq.persistence.SingleWriterService;
2321
import org.jetbrains.annotations.NotNull;
2422

2523
import java.util.List;
2624
import java.util.concurrent.CompletableFuture;
27-
import java.util.concurrent.Future;
2825

2926
public interface InternalProtocolAdapterWritingService extends ProtocolAdapterWritingService {
3027

@@ -34,7 +31,9 @@ public interface InternalProtocolAdapterWritingService extends ProtocolAdapterWr
3431
@NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
3532
@NotNull List<InternalWritingContext> writingContexts);
3633

37-
void stopWriting(@NotNull WritingProtocolAdapter writingProtocolAdapter, final @NotNull List<InternalWritingContext> writingContexts);
34+
void stopWriting(
35+
@NotNull WritingProtocolAdapter writingProtocolAdapter,
36+
final @NotNull List<InternalWritingContext> writingContexts);
3837

3938
void addWritingChangedCallback(@NotNull WritingChangedCallback callback);
4039

0 commit comments

Comments
 (0)