|
22 | 22 | import com.hivemq.adapter.sdk.api.discovery.ProtocolAdapterDiscoveryInput; |
23 | 23 | import com.hivemq.adapter.sdk.api.discovery.ProtocolAdapterDiscoveryOutput; |
24 | 24 | import com.hivemq.adapter.sdk.api.events.EventService; |
| 25 | +import com.hivemq.adapter.sdk.api.model.ProtocolAdapterStopInput; |
| 26 | +import com.hivemq.adapter.sdk.api.model.ProtocolAdapterStopOutput; |
25 | 27 | import com.hivemq.adapter.sdk.api.factories.ProtocolAdapterFactory; |
26 | 28 | import com.hivemq.adapter.sdk.api.polling.PollingProtocolAdapter; |
27 | 29 | import com.hivemq.adapter.sdk.api.polling.batch.BatchPollingProtocolAdapter; |
@@ -338,38 +340,48 @@ private void cleanupConnectionStatusListener() { |
338 | 340 |
|
339 | 341 | log.debug("Adapter '{}': Creating stop operation future", getId()); |
340 | 342 |
|
341 | | - final var stopFuture = CompletableFuture.supplyAsync(() -> { |
342 | | - log.debug("Adapter '{}': Stop operation executing in thread '{}'", getId(), Thread.currentThread().getName()); |
343 | | - |
344 | | - // Signal FSM to stop (calls onStopping() internally) |
345 | | - log.debug("Adapter '{}': Stopping adapter FSM", getId()); |
346 | | - stopAdapter(); |
347 | | - |
348 | | - // Clean up listeners to prevent memory leaks |
349 | | - log.debug("Adapter '{}': Cleaning up connection status listener", getId()); |
350 | | - cleanupConnectionStatusListener(); |
351 | | - |
352 | | - // Remove consumers - must be done within async context |
353 | | - log.debug("Adapter '{}': Removing {} consumers", getId(), consumers.size()); |
354 | | - consumers.forEach(tagManager::removeConsumer); |
355 | | - |
356 | | - log.debug("Adapter '{}': Stopping polling", getId()); |
357 | | - stopPolling(protocolAdapterPollingService); |
| 343 | + // Defensive check: if executor is shutdown, execute stop synchronously |
| 344 | + // This can happen during JVM shutdown if there's a race between shutdown hooks |
| 345 | + if (sharedAdapterExecutor.isShutdown()) { |
| 346 | + log.warn("Adapter '{}': Executor is shutdown, executing stop operation synchronously in current thread", getId()); |
| 347 | + try { |
| 348 | + // Execute stop logic directly in calling thread |
| 349 | + final CompletableFuture<Void> syncFuture = performStopOperation(input, output) |
| 350 | + .whenComplete((result, throwable) -> { |
| 351 | + log.debug("Adapter '{}': Synchronous stop operation completed, starting cleanup", getId()); |
| 352 | + |
| 353 | + // Always call destroy() to ensure all resources are properly released |
| 354 | + try { |
| 355 | + log.info("Destroying adapter with id '{}' to release all resources", getId()); |
| 356 | + adapter.destroy(); |
| 357 | + log.debug("Adapter '{}': destroy() completed successfully", getId()); |
| 358 | + } catch (final Exception destroyException) { |
| 359 | + log.error("Error destroying adapter with id {}", adapter.getId(), destroyException); |
| 360 | + } |
| 361 | + |
| 362 | + if (throwable == null) { |
| 363 | + log.info("Stopped adapter with id '{}' successfully", adapter.getId()); |
| 364 | + } else { |
| 365 | + log.error("Error stopping adapter with id {}", adapter.getId(), throwable); |
| 366 | + } |
| 367 | + |
| 368 | + // Clear reference to stop future |
| 369 | + log.debug("Adapter '{}': Cleared currentStopFuture reference", getId()); |
| 370 | + currentStopFuture = null; |
| 371 | + }); |
358 | 372 |
|
359 | | - log.debug("Adapter '{}': Stopping writing", getId()); |
360 | | - stopWriting(protocolAdapterWritingService); |
| 373 | + currentStopFuture = syncFuture; |
| 374 | + return syncFuture; |
| 375 | + } catch (final Exception e) { |
| 376 | + log.error("Adapter '{}': Exception during synchronous stop", getId(), e); |
| 377 | + return CompletableFuture.failedFuture(e); |
| 378 | + } |
| 379 | + } |
361 | 380 |
|
362 | | - try { |
363 | | - log.debug("Adapter '{}': Calling adapter.stop()", getId()); |
364 | | - adapter.stop(input, output); |
365 | | - } catch (final Throwable throwable) { |
366 | | - log.error("Adapter '{}': Exception during adapter.stop()", getId(), throwable); |
367 | | - output.getOutputFuture().completeExceptionally(throwable); |
368 | | - } |
369 | | - log.debug("Adapter '{}': Waiting for stop output future", getId()); |
370 | | - return output.getOutputFuture(); |
371 | | - }, sharedAdapterExecutor) // Use shared executor to reduce thread overhead |
372 | | - .thenCompose(Function.identity()).whenComplete((result, throwable) -> { |
| 381 | + final var stopFuture = CompletableFuture.supplyAsync(() -> performStopOperation(input, output), |
| 382 | + sharedAdapterExecutor) // Use shared executor to reduce thread overhead |
| 383 | + .thenCompose(Function.identity()) |
| 384 | + .whenComplete((result, throwable) -> { |
373 | 385 | log.debug("Adapter '{}': Stop operation completed, starting cleanup", getId()); |
374 | 386 |
|
375 | 387 | // Always call destroy() to ensure all resources are properly released |
@@ -566,4 +578,47 @@ private void createAndSubscribeTagConsumer() { |
566 | 578 | consumers.add(northboundTagConsumer); |
567 | 579 | }); |
568 | 580 | } |
| 581 | + |
| 582 | + /** |
| 583 | + * Performs the actual stop operation for the adapter. |
| 584 | + * Extracted into a separate method so it can be called both asynchronously |
| 585 | + * (normal case) and synchronously (when executor is shutdown during JVM shutdown). |
| 586 | + * |
| 587 | + * @param input the stop input |
| 588 | + * @param output the stop output |
| 589 | + * @return the completion future from the adapter's stop operation |
| 590 | + */ |
| 591 | + private @NotNull CompletableFuture<Void> performStopOperation( |
| 592 | + final @NotNull ProtocolAdapterStopInput input, |
| 593 | + final @NotNull ProtocolAdapterStopOutput output) { |
| 594 | + log.debug("Adapter '{}': Stop operation executing in thread '{}'", getId(), Thread.currentThread().getName()); |
| 595 | + |
| 596 | + // Signal FSM to stop (calls onStopping() internally) |
| 597 | + log.debug("Adapter '{}': Stopping adapter FSM", getId()); |
| 598 | + stopAdapter(); |
| 599 | + |
| 600 | + // Clean up listeners to prevent memory leaks |
| 601 | + log.debug("Adapter '{}': Cleaning up connection status listener", getId()); |
| 602 | + cleanupConnectionStatusListener(); |
| 603 | + |
| 604 | + // Remove consumers |
| 605 | + log.debug("Adapter '{}': Removing {} consumers", getId(), consumers.size()); |
| 606 | + consumers.forEach(tagManager::removeConsumer); |
| 607 | + |
| 608 | + log.debug("Adapter '{}': Stopping polling", getId()); |
| 609 | + stopPolling(protocolAdapterPollingService); |
| 610 | + |
| 611 | + log.debug("Adapter '{}': Stopping writing", getId()); |
| 612 | + stopWriting(protocolAdapterWritingService); |
| 613 | + |
| 614 | + try { |
| 615 | + log.debug("Adapter '{}': Calling adapter.stop()", getId()); |
| 616 | + adapter.stop(input, output); |
| 617 | + } catch (final Throwable throwable) { |
| 618 | + log.error("Adapter '{}': Exception during adapter.stop()", getId(), throwable); |
| 619 | + ((ProtocolAdapterStopOutputImpl) output).getOutputFuture().completeExceptionally(throwable); |
| 620 | + } |
| 621 | + log.debug("Adapter '{}': Waiting for stop output future", getId()); |
| 622 | + return ((ProtocolAdapterStopOutputImpl) output).getOutputFuture(); |
| 623 | + } |
569 | 624 | } |
0 commit comments