@@ -80,6 +80,7 @@ public class ProtocolAdapterWrapper extends ProtocolAdapterFSM {
8080 private final @ NotNull TagManager tagManager ;
8181 private final @ NotNull List <NorthboundTagConsumer > consumers ;
8282 private final @ NotNull ReentrantLock operationLock ;
83+ private final @ NotNull Object adapterLock ; // protects underlying adapter start/stop calls
8384 private final @ NotNull ExecutorService sharedAdapterExecutor ;
8485 protected volatile @ Nullable Long lastStartAttemptTime ;
8586 private @ Nullable CompletableFuture <Void > currentStartFuture ;
@@ -114,6 +115,7 @@ public ProtocolAdapterWrapper(
114115 this .consumers = new CopyOnWriteArrayList <>();
115116 this .operationLock = new ReentrantLock ();
116117 this .sharedAdapterExecutor = sharedAdapterExecutor ;
118+ this .adapterLock = new Object ();
117119
118120 if (log .isDebugEnabled ()) {
119121 registerStateTransitionListener (state -> log .debug (
@@ -375,8 +377,17 @@ public boolean isBatchPolling() {
375377 }
376378
377379 public void addTagHotReload (final @ NotNull Tag tag , final @ NotNull EventService eventService ) {
380+ // Wait for any in-progress operations before proceeding
381+ waitForOperationsToComplete ();
382+
378383 operationLock .lock ();
379384 try {
385+ if (startOperationInProgress || stopOperationInProgress ) {
386+ throw new IllegalStateException ("Cannot hot-reload tag for adapter '" +
387+ getId () +
388+ "': operation started during wait" );
389+ }
390+
380391 // Update config with new tag regardless of adapter state
381392 final List <Tag > updatedTags = new ArrayList <>(config .getTags ());
382393 updatedTags .add (tag );
@@ -407,9 +418,18 @@ public void addTagHotReload(final @NotNull Tag tag, final @NotNull EventService
407418
408419 public void updateMappingsHotReload (
409420 final @ Nullable List <NorthboundMapping > northboundMappings ,
410- final @ Nullable List <SouthboundMapping > southboundMappings ) {
421+ final @ Nullable List <SouthboundMapping > southboundMappings ,
422+ final @ NotNull EventService eventService ) {
423+ waitForOperationsToComplete ();
424+
411425 operationLock .lock ();
412426 try {
427+ if (startOperationInProgress || stopOperationInProgress ) {
428+ throw new IllegalStateException ("Cannot hot-reload mappings for adapter '" +
429+ getId () +
430+ "': operation started during wait" );
431+ }
432+
413433 if (northboundMappings != null ) {
414434 config .setNorthboundMappings (northboundMappings );
415435 }
@@ -420,25 +440,54 @@ public void updateMappingsHotReload(
420440 log .debug ("Adapter '{}' not started yet, only updating config for mappings" , getId ());
421441 return ;
422442 }
423- log .debug ("Stopping existing consumers for adapter '{}'" , getId ());
424- consumers .forEach (tagManager ::removeConsumer );
425- consumers .clear ();
443+
444+ // Stop existing consumers and polling
426445 if (northboundMappings != null ) {
446+ log .debug ("Stopping existing consumers and polling for adapter '{}'" , getId ());
447+ consumers .forEach (tagManager ::removeConsumer );
448+ consumers .clear ();
449+
450+ // Stop polling to restart with new mappings
451+ stopPolling ();
452+
427453 log .debug ("Updating northbound mappings for adapter '{}'" , getId ());
428454 northboundMappings .stream ()
429455 .map (mapping -> northboundConsumerFactory .build (this , mapping , protocolAdapterMetricsService ))
430456 .forEach (consumer -> {
431457 tagManager .addConsumer (consumer );
432458 consumers .add (consumer );
433459 });
460+
461+ // Restart polling with new consumers
462+ log .debug ("Restarting polling for adapter '{}'" , getId ());
463+ if (isBatchPolling ()) {
464+ log .debug ("Schedule batch polling for protocol adapter with id '{}'" , getId ());
465+ pollingService .schedulePolling (new PerAdapterSampler (this , eventService , tagManager ));
466+ }
467+ if (isPolling ()) {
468+ config .getTags ()
469+ .forEach (tag -> pollingService .schedulePolling (new PerContextSampler (this ,
470+ new PollingContextWrapper ("unused" ,
471+ tag .getName (),
472+ MessageHandlingOptions .MQTTMessagePerTag ,
473+ false ,
474+ false ,
475+ List .of (),
476+ 1 ,
477+ -1 ),
478+ eventService ,
479+ tagManager )));
480+ }
434481 }
482+
435483 if (southboundMappings != null && isWriting ()) {
436484 log .debug ("Updating southbound mappings for adapter '{}'" , getId ());
437485 final StateEnum currentSouthboundState = currentState ().southbound ();
438- if (currentSouthboundState == StateEnum .CONNECTED ) {
486+ final boolean wasConnected = (currentSouthboundState == StateEnum .CONNECTED );
487+ if (wasConnected ) {
488+ log .debug ("Stopping southbound for adapter '{}' before hot-reload" , getId ());
439489 stopWriting ();
440- }
441- if (currentSouthboundState == StateEnum .CONNECTED ) {
490+ log .debug ("Restarting southbound for adapter '{}' after hot-reload" , getId ());
442491 startSouthbound ();
443492 }
444493 }
@@ -448,6 +497,35 @@ public void updateMappingsHotReload(
448497 }
449498 }
450499
500+ private void waitForOperationsToComplete () {
501+ CompletableFuture <Void > futureToWait = null ;
502+ operationLock .lock ();
503+ try {
504+ if (startOperationInProgress ) {
505+ log .debug ("Adapter '{}': Waiting for start operation to complete before hot-reload" , getId ());
506+ futureToWait = currentStartFuture ;
507+ } else if (stopOperationInProgress ) {
508+ log .debug ("Adapter '{}': Waiting for stop operation to complete before hot-reload" , getId ());
509+ futureToWait = currentStopFuture ;
510+ }
511+ } finally {
512+ operationLock .unlock ();
513+ }
514+
515+ if (futureToWait != null ) {
516+ try {
517+ // Wait with a timeout to prevent indefinite blocking
518+ futureToWait .get (30 , TimeUnit .SECONDS );
519+ log .debug ("Adapter '{}': Operation completed, proceeding with hot-reload" , getId ());
520+ } catch (final TimeoutException e ) {
521+ log .warn ("Adapter '{}': Operation did not complete within 30 seconds, proceeding with hot-reload anyway" ,
522+ getId ());
523+ } catch (final Exception e ) {
524+ log .warn ("Adapter '{}': Operation completed with error, but proceeding with hot-reload" , getId (), e );
525+ }
526+ }
527+ }
528+
451529 private void cleanupConnectionStatusListener () {
452530 final Consumer <ProtocolAdapterState .ConnectionStatus > listenerToClean = connectionStatusListener ;
453531 if (listenerToClean != null ) {
@@ -548,10 +626,15 @@ private void stopWriting() {
548626 return () -> {
549627 startAdapter (); // start FSM
550628 final ProtocolAdapterStartOutputImpl output = new ProtocolAdapterStartOutputImpl ();
551- try {
552- adapter .start (new ProtocolAdapterStartInputImpl (moduleServices ), output );
553- } catch (final Throwable t ) {
554- output .getStartFuture ().completeExceptionally (t );
629+ synchronized (adapterLock ) {
630+ log .debug ("Adapter '{}': Calling adapter.start() in thread '{}'" ,
631+ getId (),
632+ Thread .currentThread ().getName ());
633+ try {
634+ adapter .start (new ProtocolAdapterStartInputImpl (moduleServices ), output );
635+ } catch (final Throwable t ) {
636+ output .getStartFuture ().completeExceptionally (t );
637+ }
555638 }
556639 return output .getStartFuture ();
557640 };
@@ -602,11 +685,14 @@ private void stopProtocolAdapterOnFailedStart() {
602685
603686 // Initiate adapter stop
604687 final var output = new ProtocolAdapterStopOutputImpl ();
605- try {
606- adapter .stop (new ProtocolAdapterStopInputImpl (), output );
607- } catch (final Throwable throwable ) {
608- log .error ("Adapter '{}': Exception during adapter.stop()" , adapter .getId (), throwable );
609- output .getOutputFuture ().completeExceptionally (throwable );
688+ synchronized (adapterLock ) {
689+ log .debug ("Adapter '{}': Calling adapter.stop() in thread '{}'" , getId (), Thread .currentThread ().getName ());
690+ try {
691+ adapter .stop (new ProtocolAdapterStopInputImpl (), output );
692+ } catch (final Throwable throwable ) {
693+ log .error ("Adapter '{}': Exception during adapter.stop()" , adapter .getId (), throwable );
694+ output .getOutputFuture ().completeExceptionally (throwable );
695+ }
610696 }
611697
612698 log .debug ("Adapter '{}': Waiting for stop output future" , adapter .getId ());
0 commit comments