Skip to content

Commit 8782192

Browse files
authored
Merge branch 'master' into 27217-file-input-adapter-fields
2 parents 475cf6b + 0352e93 commit 8782192

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState;
3838
import com.hivemq.adapter.sdk.api.writing.WritingContext;
3939
import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter;
40-
import com.hivemq.bootstrap.factories.WritingServiceProvider;
4140
import com.hivemq.configuration.service.ConfigurationService;
4241
import com.hivemq.edge.HiveMQEdgeRemoteService;
4342
import com.hivemq.edge.VersionProvider;
@@ -318,8 +317,7 @@ CompletableFuture<Void> start(final @NotNull ProtocolAdapterWrapper protocolAdap
318317

319318
private @NotNull CompletableFuture<Void> startWriting(final @NotNull ProtocolAdapterWrapper protocolAdapterWrapper) {
320319
final CompletableFuture<Void> startWritingFuture;
321-
if (writingEnabled() &&
322-
protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) {
320+
if (writingEnabled() && protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) {
323321
if (log.isDebugEnabled()) {
324322
log.debug("Start writing for protocol adapter with id '{}'", protocolAdapterWrapper.getId());
325323
}
@@ -377,8 +375,13 @@ private void schedulePolling(final @NotNull ProtocolAdapterWrapper protocolAdapt
377375

378376
return stopWritingFuture.thenComposeAsync(ignored -> {
379377
final ProtocolAdapterStopOutputImpl adapterStopOutput = new ProtocolAdapterStopOutputImpl();
380-
protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput);
381-
return adapterStopOutput.getOutputFuture();
378+
if (protocolAdapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) {
379+
protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput);
380+
return adapterStopOutput.getOutputFuture();
381+
} else {
382+
return CompletableFuture.completedFuture(null);
383+
}
384+
382385
}, executorService).<Void>thenApply(input -> {
383386
log.info("Protocol-adapter '{}' stopped successfully.", protocolAdapterWrapper.getId());
384387
protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
@@ -465,12 +468,9 @@ public boolean deleteAdapter(final @NotNull String id) {
465468
if (adapterWrapper != null) {
466469
protocolAdapterMetrics.decreaseProtocolAdapterMetric(adapterWrapper.getAdapterInformation()
467470
.getProtocolId());
468-
469471
try {
470-
if (adapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) {
471-
// FIXME: We need to adapt the whole flow to async
472-
stop(adapterWrapper).get();
473-
}
472+
// stop in any case as some resources must be cleaned up even if the adapter is still being started and is not yet in started state
473+
stop(adapterWrapper).get();
474474
} catch (final InterruptedException e) {
475475
Thread.currentThread().interrupt();
476476
} catch (final ExecutionException e) {

0 commit comments

Comments
 (0)