Skip to content

Commit 3e71064

Browse files
authored
Added a listener for the connection state to start writing at the correct time (#1121)
1 parent 05c1dc3 commit 3e71064

File tree

2 files changed

+46
-28
lines changed

2 files changed

+46
-28
lines changed

hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/ProtocolAdapterStateImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.commons.lang3.exception.ExceptionUtils;
2626

2727
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.function.Consumer;
2829

2930
public class ProtocolAdapterStateImpl implements ProtocolAdapterState {
3031
private final @NotNull EventService eventService;
@@ -34,6 +35,7 @@ public class ProtocolAdapterStateImpl implements ProtocolAdapterState {
3435
protected @NotNull AtomicReference<ConnectionStatus> connectionStatus =
3536
new AtomicReference<>(ConnectionStatus.DISCONNECTED);
3637
protected @Nullable String lastErrorMessage;
38+
private AtomicReference<Consumer<ConnectionStatus>> connectionStatusListener = new AtomicReference<>();
3739

3840
public ProtocolAdapterStateImpl(final @NotNull EventService eventService,
3941
final @NotNull String adapterId,
@@ -46,7 +48,14 @@ public ProtocolAdapterStateImpl(final @NotNull EventService eventService,
4648
@Override
4749
public boolean setConnectionStatus(final @NotNull ConnectionStatus connectionStatus) {
4850
Preconditions.checkNotNull(connectionStatus);
49-
return this.connectionStatus.getAndSet(connectionStatus) != connectionStatus;
51+
final var changed = this.connectionStatus.getAndSet(connectionStatus) != connectionStatus;
52+
if(changed) {
53+
final var listener = connectionStatusListener.get();
54+
if(listener != null) {
55+
listener.accept(connectionStatus);
56+
}
57+
}
58+
return changed;
5059
}
5160

5261
@Override
@@ -102,4 +111,9 @@ public void setRuntimeStatus(final @NotNull RuntimeStatus runtimeStatus) {
102111
return lastErrorMessage;
103112
}
104113

114+
public void setConnectionStatusListener(Consumer<ConnectionStatus> connectionStatusListener) {
115+
this.connectionStatusListener.set(connectionStatusListener);
116+
connectionStatusListener.accept(connectionStatus.get());
117+
}
118+
105119
}

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.hivemq.adapter.sdk.api.tag.Tag;
3232
import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter;
3333
import com.hivemq.edge.modules.adapters.data.TagManager;
34+
import com.hivemq.edge.modules.adapters.impl.ProtocolAdapterStateImpl;
3435
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingService;
3536
import com.hivemq.persistence.mappings.NorthboundMapping;
3637
import com.hivemq.persistence.mappings.SouthboundMapping;
@@ -48,6 +49,7 @@
4849
import java.util.Optional;
4950
import java.util.concurrent.CompletableFuture;
5051
import java.util.concurrent.ExecutionException;
52+
import java.util.concurrent.atomic.AtomicBoolean;
5153
import java.util.concurrent.atomic.AtomicReference;
5254
import java.util.function.Function;
5355
import java.util.stream.Collectors;
@@ -69,7 +71,7 @@ private enum OperationState {
6971
private final @NotNull ProtocolAdapter adapter;
7072
private final @NotNull ProtocolAdapterFactory<?> adapterFactory;
7173
private final @NotNull ProtocolAdapterInformation adapterInformation;
72-
private final @NotNull ProtocolAdapterState protocolAdapterState;
74+
private final @NotNull ProtocolAdapterStateImpl protocolAdapterState;
7375
private final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService;
7476
private final @NotNull ProtocolAdapterPollingService protocolAdapterPollingService;
7577
private final @NotNull ProtocolAdapterConfig config;
@@ -109,7 +111,7 @@ public ProtocolAdapterWrapper(
109111
final @NotNull ProtocolAdapter adapter,
110112
final @NotNull ProtocolAdapterFactory<?> adapterFactory,
111113
final @NotNull ProtocolAdapterInformation adapterInformation,
112-
final @NotNull ProtocolAdapterState protocolAdapterState,
114+
final @NotNull ProtocolAdapterStateImpl protocolAdapterState,
113115
final @NotNull NorthboundConsumerFactory northboundConsumerFactory,
114116
final @NotNull TagManager tagManager) {
115117
this.protocolAdapterWritingService = protocolAdapterWritingService;
@@ -202,12 +204,20 @@ private Optional<Throwable> attemptStartingConsumers(final boolean writingEnable
202204
//Adapter started successfully, now start the consumers
203205
createAndSubscribeTagConsumer();
204206
startPolling(protocolAdapterPollingService, eventService);
205-
if(startWriting(writingEnabled, protocolAdapterWritingService)) {
206-
log.info("Successfully started adapter with id {}", adapter.getId());
207-
} else {
208-
log.error("Protocol adapter start failed as data hub is not available.");
209-
return Optional.of(new RuntimeException(
210-
"Protocol adapter start failed as data hub is not available."));
207+
208+
if(writingEnabled && isWriting()) {
209+
final var started = new AtomicBoolean(false);
210+
protocolAdapterState.setConnectionStatusListener(status -> {
211+
if(status == ProtocolAdapterState.ConnectionStatus.CONNECTED) {
212+
if(started.compareAndSet(false, true)) {
213+
if(startWriting(protocolAdapterWritingService)) {
214+
log.info("Successfully started adapter with id {}", adapter.getId());
215+
} else {
216+
log.error("Protocol adapter start failed as data hub is not available.");
217+
}
218+
}
219+
}
220+
});
211221
}
212222
} catch (final Throwable e) {
213223
log.error("Protocol adapter start failed");
@@ -432,25 +442,19 @@ private void stopPolling(
432442
}
433443
}
434444

435-
private @NotNull boolean startWriting(
436-
final boolean writingEnabled,
437-
final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) {
438-
if (writingEnabled && isWriting()) {
439-
log.debug("Start writing for protocol adapter with id '{}'", getId());
440-
441-
final var southboundMappings = getSouthboundMappings();
442-
final var writingContexts = southboundMappings.stream()
443-
.map(InternalWritingContextImpl::new)
444-
.collect(Collectors.<InternalWritingContext>toList());
445-
446-
return protocolAdapterWritingService
447-
.startWriting(
448-
(WritingProtocolAdapter) getAdapter(),
449-
getProtocolAdapterMetricsService(),
450-
writingContexts);
451-
} else {
452-
return true;
453-
}
445+
private @NotNull boolean startWriting(final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) {
446+
log.debug("Start writing for protocol adapter with id '{}'", getId());
447+
448+
final var southboundMappings = getSouthboundMappings();
449+
final var writingContexts = southboundMappings.stream()
450+
.map(InternalWritingContextImpl::new)
451+
.collect(Collectors.<InternalWritingContext>toList());
452+
453+
return protocolAdapterWritingService
454+
.startWriting(
455+
(WritingProtocolAdapter) getAdapter(),
456+
getProtocolAdapterMetricsService(),
457+
writingContexts);
454458
}
455459

456460
private void stopWriting(final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) {

0 commit comments

Comments
 (0)