7070import com .hivemq .edge .api .model .StatusTransitionResult ;
7171import com .hivemq .edge .api .model .TagSchema ;
7272import com .hivemq .edge .modules .adapters .impl .ProtocolAdapterDiscoveryOutputImpl ;
73+ import com .hivemq .persistence .mappings .NorthboundMapping ;
74+ import com .hivemq .persistence .mappings .SouthboundMapping ;
7375import com .hivemq .persistence .topicfilter .TopicFilterPersistence ;
7476import com .hivemq .persistence .topicfilter .TopicFilterPojo ;
7577import com .hivemq .protocols .InternalProtocolAdapterWritingService ;
@@ -803,22 +805,31 @@ public int getDepth() {
803805 missingTags ));
804806 }
805807
806- return configExtractor .getAdapterByAdapterId (adapterId )
807- .map (cfg -> new ProtocolAdapterEntity (cfg .getAdapterId (),
808- cfg .getProtocolId (),
809- cfg .getConfigVersion (),
810- cfg .getConfig (),
811- converted ,
812- cfg .getSouthboundMappings (),
813- cfg .getTags ()))
814- .map (newCfg -> {
815- if (!configExtractor .updateAdapter (newCfg )) {
816- return adapterCannotBeUpdatedError (adapterId );
817- }
818- log .info ("Successfully updated northbound mappings for adapter '{}'." , adapterId );
819- return Response .ok (northboundMappings ).build ();
820- })
821- .orElseGet (adapterNotUpdatedError (adapterId ));
808+ final List <NorthboundMapping > convertedMappings =
809+ converted .stream ().map (NorthboundMappingEntity ::toPersistence ).toList ();
810+ if (protocolAdapterManager .updateNorthboundMappingsHotReload (adapterId , convertedMappings )) {
811+ // update config persistence
812+ return configExtractor .getAdapterByAdapterId (adapterId )
813+ .map (cfg -> new ProtocolAdapterEntity (cfg .getAdapterId (),
814+ cfg .getProtocolId (),
815+ cfg .getConfigVersion (),
816+ cfg .getConfig (),
817+ converted ,
818+ cfg .getSouthboundMappings (),
819+ cfg .getTags ()))
820+ .map (newCfg -> {
821+ if (!configExtractor .updateAdapter (newCfg )) {
822+ return adapterCannotBeUpdatedError (adapterId );
823+ }
824+ log .info ("Successfully updated northbound mappings for adapter '{}' via hot-reload." ,
825+ adapterId );
826+ return Response .ok (northboundMappings ).build ();
827+ })
828+ .orElseGet (adapterNotUpdatedError (adapterId ));
829+ } else {
830+ log .error ("Hot-reload failed for northbound mappings on adapter '{}'" , adapterId );
831+ return errorResponse (new InternalServerError ("Failed to hot-reload northbound mappings" ));
832+ }
822833 };
823834 }
824835
@@ -841,22 +852,31 @@ public int getDepth() {
841852 missingTags ));
842853 }
843854
844- return configExtractor .getAdapterByAdapterId (adapterId )
845- .map (cfg -> new ProtocolAdapterEntity (cfg .getAdapterId (),
846- cfg .getProtocolId (),
847- cfg .getConfigVersion (),
848- cfg .getConfig (),
849- cfg .getNorthboundMappings (),
850- converted ,
851- cfg .getTags ()))
852- .map (newCfg -> {
853- if (!configExtractor .updateAdapter (newCfg )) {
854- return adapterCannotBeUpdatedError (adapterId );
855- }
856- log .info ("Successfully updated fromMappings for adapter '{}'." , adapterId );
857- return Response .ok (southboundMappings ).build ();
858- })
859- .orElseGet (adapterNotUpdatedError (adapterId ));
855+ final List <SouthboundMapping > convertedMappings =
856+ converted .stream ().map (entity -> entity .toPersistence (objectMapper )).toList ();
857+ if (protocolAdapterManager .updateSouthboundMappingsHotReload (adapterId , convertedMappings )) {
858+ // update config persistence
859+ return configExtractor .getAdapterByAdapterId (adapterId )
860+ .map (cfg -> new ProtocolAdapterEntity (cfg .getAdapterId (),
861+ cfg .getProtocolId (),
862+ cfg .getConfigVersion (),
863+ cfg .getConfig (),
864+ cfg .getNorthboundMappings (),
865+ converted ,
866+ cfg .getTags ()))
867+ .map (newCfg -> {
868+ if (!configExtractor .updateAdapter (newCfg )) {
869+ return adapterCannotBeUpdatedError (adapterId );
870+ }
871+ log .info ("Successfully updated southbound mappings for adapter '{}' via hot-reload." ,
872+ adapterId );
873+ return Response .ok (southboundMappings ).build ();
874+ })
875+ .orElseGet (adapterNotUpdatedError (adapterId ));
876+ } else {
877+ log .error ("Hot-reload failed for southbound mappings on adapter '{}'" , adapterId );
878+ return errorResponse (new InternalServerError ("Failed to hot-reload southbound mappings" ));
879+ }
860880 };
861881 }
862882
@@ -911,11 +931,10 @@ private void validateAdapterSchema(
911931
912932 private @ NotNull Adapter toAdapter (final @ NotNull ProtocolAdapterWrapper value ) {
913933 final String adapterId = value .getId ();
914- final Map <String , Object > config = runWithContextLoader (
915- value .getAdapterFactory ().getClass ().getClassLoader (),
916- () -> {
917- final Map <String , Object > cfg = value .getAdapterFactory ()
918- .unconvertConfigObject (objectMapper , value .getConfigObject ());
934+ final Map <String , Object > config =
935+ runWithContextLoader (value .getAdapterFactory ().getClass ().getClassLoader (), () -> {
936+ final Map <String , Object > cfg =
937+ value .getAdapterFactory ().unconvertConfigObject (objectMapper , value .getConfigObject ());
919938 cfg .put ("id" , value .getId ());
920939 return cfg ;
921940 });
0 commit comments