|
37 | 37 | import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState; |
38 | 38 | import com.hivemq.adapter.sdk.api.writing.WritingContext; |
39 | 39 | import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter; |
40 | | -import com.hivemq.bootstrap.factories.WritingServiceProvider; |
41 | 40 | import com.hivemq.configuration.service.ConfigurationService; |
42 | 41 | import com.hivemq.edge.HiveMQEdgeRemoteService; |
43 | 42 | import com.hivemq.edge.VersionProvider; |
@@ -318,8 +317,7 @@ CompletableFuture<Void> start(final @NotNull ProtocolAdapterWrapper protocolAdap |
318 | 317 |
|
319 | 318 | private @NotNull CompletableFuture<Void> startWriting(final @NotNull ProtocolAdapterWrapper protocolAdapterWrapper) { |
320 | 319 | final CompletableFuture<Void> startWritingFuture; |
321 | | - if (writingEnabled() && |
322 | | - protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) { |
| 320 | + if (writingEnabled() && protocolAdapterWrapper.getAdapter() instanceof WritingProtocolAdapter) { |
323 | 321 | if (log.isDebugEnabled()) { |
324 | 322 | log.debug("Start writing for protocol adapter with id '{}'", protocolAdapterWrapper.getId()); |
325 | 323 | } |
@@ -377,8 +375,13 @@ private void schedulePolling(final @NotNull ProtocolAdapterWrapper protocolAdapt |
377 | 375 |
|
378 | 376 | return stopWritingFuture.thenComposeAsync(ignored -> { |
379 | 377 | final ProtocolAdapterStopOutputImpl adapterStopOutput = new ProtocolAdapterStopOutputImpl(); |
380 | | - protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput); |
381 | | - return adapterStopOutput.getOutputFuture(); |
| 378 | + if (protocolAdapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) { |
| 379 | + protocolAdapterWrapper.stop(new ProtocolAdapterStopInputImpl(), adapterStopOutput); |
| 380 | + return adapterStopOutput.getOutputFuture(); |
| 381 | + } else { |
| 382 | + return CompletableFuture.completedFuture(null); |
| 383 | + } |
| 384 | + |
382 | 385 | }, executorService).<Void>thenApply(input -> { |
383 | 386 | log.info("Protocol-adapter '{}' stopped successfully.", protocolAdapterWrapper.getId()); |
384 | 387 | protocolAdapterWrapper.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); |
@@ -465,12 +468,9 @@ public boolean deleteAdapter(final @NotNull String id) { |
465 | 468 | if (adapterWrapper != null) { |
466 | 469 | protocolAdapterMetrics.decreaseProtocolAdapterMetric(adapterWrapper.getAdapterInformation() |
467 | 470 | .getProtocolId()); |
468 | | - |
469 | 471 | try { |
470 | | - if (adapterWrapper.getRuntimeStatus() == ProtocolAdapterState.RuntimeStatus.STARTED) { |
471 | | - // FIXME: We need to adapt the whole flow to async |
472 | | - stop(adapterWrapper).get(); |
473 | | - } |
| 472 | + // stop in any case as some resources must be cleaned up even if the adapter is still being started and is not yet in started state |
| 473 | + stop(adapterWrapper).get(); |
474 | 474 | } catch (final InterruptedException e) { |
475 | 475 | Thread.currentThread().interrupt(); |
476 | 476 | } catch (final ExecutionException e) { |
|
0 commit comments