2727import com .hivemq .adapter .sdk .api .data .JsonPayloadCreator ;
2828import com .hivemq .adapter .sdk .api .events .EventService ;
2929import com .hivemq .adapter .sdk .api .events .model .Payload ;
30- import com .hivemq .adapter .sdk .api .factories .DataPointFactory ;
3130import com .hivemq .adapter .sdk .api .services .ProtocolAdapterMetricsService ;
3231import com .hivemq .edge .modules .adapters .data .DataPointImpl ;
3332import com .hivemq .edge .modules .adapters .impl .ProtocolAdapterPublishServiceImpl ;
@@ -59,7 +58,6 @@ public class NorthboundTagConsumer implements TagConsumer{
5958 private final @ NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService ;
6059 private final @ NotNull EventService eventService ;
6160 private final @ NotNull AtomicInteger publishCount = new AtomicInteger (0 );
62- private final @ NotNull DataPointFactory dataPointFactory ;
6361
6462 public NorthboundTagConsumer (
6563 final @ NotNull PollingContext pollingContext ,
@@ -76,19 +74,6 @@ public NorthboundTagConsumer(
7674 this .protocolAdapterPublishService = protocolAdapterPublishService ;
7775 this .protocolAdapterMetricsService = protocolAdapterMetricsService ;
7876 this .eventService = eventService ;
79- this .dataPointFactory = new DataPointFactory () {
80- @ Override
81- public @ NotNull DataPoint create (final @ NotNull String tagName , final @ NotNull Object tagValue ) {
82- return new DataPointImpl (tagName , tagValue );
83- }
84-
85- @ Override
86- public @ NotNull DataPoint createJsonDataPoint (
87- final @ NotNull String tagName ,
88- final @ NotNull Object tagValue ) {
89- return new DataPointImpl (tagName , tagValue , true );
90- }
91- };
9277 }
9378
9479 public void accept (final @ NotNull List <DataPoint > dataPoints ) {
@@ -101,8 +86,6 @@ public void accept(final @NotNull List<DataPoint> dataPoints) {
10186 try {
10287 final ImmutableList .Builder <CompletableFuture <?>> publishFutures = ImmutableList .builder ();
10388
104- final List <byte []> jsonPayloadsAsBytes = new ArrayList <>();
105-
10689 final JsonPayloadCreator jsonPayloadCreatorOverride = pollingContext .getJsonPayloadCreator ();
10790
10891 final List <DataPoint > jsonDataPoints =
@@ -111,23 +94,25 @@ public void accept(final @NotNull List<DataPoint> dataPoints) {
11194 final var preparedJsonDataPoints = jsonDataPoints .stream ().map (jsonDataPoint -> {
11295 try {
11396 final var jsonMap =objectMapper .readValue ((String )jsonDataPoint .getTagValue (), typeRef );
114- final var value = jsonMap .get ("value" );
115- if (value !=null ) {
116- return dataPointFactory .create (jsonDataPoint .getTagName (), value );
97+ if (jsonMap .size () > 1 && jsonMap .containsKey ("value" )) {
98+ return new DataPointImpl (jsonDataPoint .getTagName (), jsonMap );
99+ } else if (jsonMap .containsKey ("value" )) {
100+ return new DataPointImpl (jsonDataPoint .getTagName (), jsonMap .get ("value" ), true );
117101 } else {
118102 throw new RuntimeException ("No value entry in JSON message" );
119103 }
120- } catch (JsonProcessingException e ) {
104+ } catch (final JsonProcessingException e ) {
121105 throw new RuntimeException (e );
122106 }
123107 }).toList ();
124108
125109 final var dataPointsCopied = new ArrayList <>(dataPoints );
126110 dataPointsCopied .removeAll (jsonDataPoints );
127111 dataPointsCopied .addAll (preparedJsonDataPoints );
128- jsonPayloadsAsBytes
129- .addAll (Objects .requireNonNullElse (jsonPayloadCreatorOverride , jsonPayloadCreator )
130- .convertToJson (dataPointsCopied , pollingContext , objectMapper ));
112+
113+ final List <byte []> jsonPayloadsAsBytes =
114+ new ArrayList <>(Objects .requireNonNullElse (jsonPayloadCreatorOverride , jsonPayloadCreator )
115+ .convertToJson (dataPointsCopied , pollingContext , objectMapper ));
131116
132117 for (final byte [] json : jsonPayloadsAsBytes ) {
133118 final ProtocolAdapterPublishBuilder publishBuilder = protocolAdapterPublishService .createPublish ()
0 commit comments