Skip to content

Commit 9c47a1c

Browse files
Restore messageHandlingOptions to northboundMapping config, ignoring the setting (#1187)
* Restore messageHandlingOptions to northboundMapping config, ignoring the setting * Remove test flakyness from mqttsn * Fix config file * Added warn log for deprecated messageHandlingOptions field --------- Co-authored-by: Jochen Mader <[email protected]>
1 parent 235bc93 commit 9c47a1c

File tree

10 files changed

+79
-31
lines changed

10 files changed

+79
-31
lines changed

hivemq-edge/src/main/java/com/hivemq/configuration/entity/adapter/NorthboundMappingEntity.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,19 @@
2525
import jakarta.xml.bind.annotation.XmlElementWrapper;
2626
import org.jetbrains.annotations.NotNull;
2727
import org.jetbrains.annotations.Nullable;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
import java.util.ArrayList;
3032
import java.util.List;
3133
import java.util.Objects;
3234

3335
public class NorthboundMappingEntity implements EntityValidatable {
3436

37+
private static final @NotNull Logger log = LoggerFactory.getLogger(NorthboundMappingEntity.class);
38+
39+
private static final @NotNull MessageHandlingOptions NORTHBOUND_OPTS = MessageHandlingOptions.MQTTMessagePerTag;
40+
3541
@XmlElement(name = "tagName", required = true)
3642
private final @NotNull String tagName;
3743

@@ -41,6 +47,9 @@ public class NorthboundMappingEntity implements EntityValidatable {
4147
@XmlElement(name = "maxQos", required = true)
4248
private final int maxQoS;
4349

50+
@XmlElement(name = "messageHandlingOptions", defaultValue = "MQTTMessagePerTag", nillable = true)
51+
private final @Nullable MessageHandlingOptions messageHandlingOptions;
52+
4453
@XmlElement(name = "includeTagNames", required = true)
4554
private final @Nullable Boolean includeTagNames;
4655

@@ -63,19 +72,23 @@ public NorthboundMappingEntity() {
6372
includeTimestamp = true;
6473
userProperties = new ArrayList<>();
6574
messageExpiryInterval = Long.MAX_VALUE;
75+
messageHandlingOptions = NORTHBOUND_OPTS;
6676
}
6777

6878
public NorthboundMappingEntity(
6979
final @NotNull String tagName,
7080
final @NotNull String topic,
7181
final int maxQoS,
82+
final @Nullable MessageHandlingOptions ignore,
7283
final boolean includeTagNames,
7384
final boolean includeTimestamp,
7485
final @NotNull List<MqttUserPropertyEntity> userProperties,
7586
final @Nullable Long messageExpiryInterval) {
7687
this.tagName = tagName;
7788
this.topic = topic;
7889
this.maxQoS = maxQoS;
90+
log.warn("The 'messageHandlingOptions' property in the 'northboundMapping' configuration is ignored. Always using 'MQTTMessagePerTag' handling.");
91+
this.messageHandlingOptions = null;
7992
this.includeTagNames = includeTagNames;
8093
this.includeTimestamp = includeTimestamp;
8194
this.userProperties = userProperties;
@@ -86,6 +99,7 @@ public NorthboundMappingEntity(
8699
return new NorthboundMappingEntity(mapping.getTagName(),
87100
mapping.getMqttTopic(),
88101
mapping.getMqttQos(),
102+
NORTHBOUND_OPTS,
89103
mapping.getIncludeTagNames(),
90104
mapping.getIncludeTimestamp(),
91105
mapping.getUserProperties().stream().map(NorthboundMappingEntity::userProp).toList(),
@@ -108,6 +122,7 @@ public NorthboundMappingEntity(
108122
case AT_LEAST_ONCE -> 1;
109123
case EXACTLY_ONCE -> 2;
110124
},
125+
NORTHBOUND_OPTS,
111126
mapping.getIncludeTagNames(),
112127
mapping.getIncludeTimestamp(),
113128
mapping.getUserProperties().stream().map(NorthboundMappingEntity::userProp).toList(),
@@ -123,7 +138,7 @@ public NorthboundMappingEntity(
123138
}
124139

125140
public @NotNull MessageHandlingOptions getMessageHandlingOptions() {
126-
return MessageHandlingOptions.MQTTMessagePerTag;
141+
return NORTHBOUND_OPTS;
127142
}
128143

129144
public boolean isIncludeTagNames() {
@@ -153,6 +168,7 @@ public void validate(final @NotNull List<ValidationEvent> validationEvents) {
153168
EntityValidatable.notMatch(validationEvents,
154169
() -> QoS.valueOf(maxQoS) != null,
155170
() -> "maxQos" + ' ' + maxQoS + " is invalid");
171+
EntityValidatable.notNull(validationEvents, messageHandlingOptions, "messageHandlingOptions");
156172
EntityValidatable.notNull(validationEvents, includeTagNames, "includeTagNames");
157173
EntityValidatable.notNull(validationEvents, includeTimestamp, "includeTimestamp");
158174
if (EntityValidatable.notNull(validationEvents, messageExpiryInterval, "messageExpiryInterval")) {
@@ -200,6 +216,7 @@ public boolean equals(final @Nullable Object o) {
200216
return Objects.equals(tagName, that.tagName) &&
201217
Objects.equals(topic, that.topic) &&
202218
maxQoS == that.maxQoS &&
219+
Objects.equals(messageHandlingOptions, that.messageHandlingOptions) &&
203220
includeTagNames == that.includeTagNames &&
204221
includeTimestamp == that.includeTimestamp &&
205222
Objects.equals(userProperties, that.userProperties) &&
@@ -213,6 +230,7 @@ public int hashCode() {
213230
return Objects.hash(tagName,
214231
topic,
215232
maxQoS,
233+
messageHandlingOptions,
216234
includeTagNames,
217235
includeTimestamp,
218236
userProperties,

hivemq-edge/src/test/java/com/hivemq/configuration/reader/ProtocolAdapterExtractorTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.hivemq.configuration.reader;
1717

18+
import com.hivemq.adapter.sdk.api.config.MessageHandlingOptions;
1819
import com.hivemq.configuration.entity.HiveMQConfigEntity;
1920
import com.hivemq.configuration.entity.adapter.NorthboundMappingEntity;
2021
import com.hivemq.configuration.entity.adapter.ProtocolAdapterEntity;
@@ -371,7 +372,7 @@ public void whenNoTags_setConfigurationShouldReturnFalse() throws IOException {
371372
final HiveMQConfigEntity entity = configFileReader.applyConfig();
372373
assertThat(entity).isNotNull();
373374
final NorthboundMappingEntity northboundMappingEntity =
374-
new NorthboundMappingEntity("tagName", "topic", 1, false, true, List.of(), 100L);
375+
new NorthboundMappingEntity("tagName", "topic", 1, null, false, true, List.of(), 100L);
375376
final ProtocolAdapterEntity protocolAdapterEntity = new ProtocolAdapterEntity("adapterId",
376377
"protocolId",
377378
1,
@@ -388,8 +389,14 @@ public void whenNorthboundMappingTagNameAreNotFound_setConfigurationShouldReturn
388389
final ConfigFileReaderWriter configFileReader = getConfigFileReaderWriter();
389390
final HiveMQConfigEntity entity = configFileReader.applyConfig();
390391
assertThat(entity).isNotNull();
391-
final NorthboundMappingEntity northboundMappingEntity =
392-
new NorthboundMappingEntity("tagName", "topic", 1, false, true, List.of(), 100L);
392+
final NorthboundMappingEntity northboundMappingEntity = new NorthboundMappingEntity("tagName",
393+
"topic",
394+
1,
395+
MessageHandlingOptions.MQTTMessagePerSubscription,
396+
false,
397+
true,
398+
List.of(),
399+
100L);
393400
final ProtocolAdapterEntity protocolAdapterEntity = new ProtocolAdapterEntity("adapterId",
394401
"protocolId",
395402
1,
@@ -410,8 +417,14 @@ public void whenNorthboundMappingTagNameOrTopicIsEmpty_setConfigurationShouldRet
410417
final ConfigFileReaderWriter configFileReader = getConfigFileReaderWriter();
411418
final HiveMQConfigEntity entity = configFileReader.applyConfig();
412419
assertThat(entity).isNotNull();
413-
final NorthboundMappingEntity northboundMappingEntity =
414-
new NorthboundMappingEntity(tagName, topic, 1, false, true, List.of(), 100L);
420+
final NorthboundMappingEntity northboundMappingEntity = new NorthboundMappingEntity(tagName,
421+
topic,
422+
1,
423+
MessageHandlingOptions.MQTTMessagePerSubscription,
424+
false,
425+
true,
426+
List.of(),
427+
100L);
415428
final ProtocolAdapterEntity protocolAdapterEntity = new ProtocolAdapterEntity("adapterId",
416429
"protocolId",
417430
1,

hivemq-edge/src/test/java/com/hivemq/extensions/services/executor/ManagedExecutorServicePerExtensionTest.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void test_execute() throws InterruptedException {
100100

101101
managedExecutorServicePerExtension.execute(runLatch::countDown);
102102

103-
assertTrue(runLatch.await(5, TimeUnit.SECONDS));
103+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
104104
}
105105

106106
@Test
@@ -111,7 +111,7 @@ public void test_execute_plugin_stopped() throws Exception {
111111

112112
managedExecutorServicePerExtension.execute(runLatch::countDown);
113113

114-
assertTrue(runLatch.await(1, TimeUnit.SECONDS));
114+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
115115
}
116116

117117
@Test
@@ -129,7 +129,7 @@ public void test_schedule_runnable() throws Exception {
129129
});
130130

131131
assertFalse(runLatch.await(250, TimeUnit.MILLISECONDS));
132-
assertTrue(runLatch.await(2, TimeUnit.SECONDS));
132+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
133133

134134
futureLatch.await();
135135

@@ -145,37 +145,37 @@ public void test_schedule_runnable_plugin_stopped() throws Exception {
145145
final CompletableScheduledFuture<?> schedule =
146146
managedExecutorServicePerExtension.schedule(runLatch::countDown, 500, TimeUnit.MILLISECONDS);
147147

148-
assertTrue(runLatch.await(2, TimeUnit.SECONDS));
148+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
149149

150150
schedule.get();
151151

152152
assertFalse(schedule.isCancelled());
153153
assertTrue(schedule.isDone());
154154
}
155155

156-
@Test(timeout = 5000)
156+
@Test(timeout = 10000)
157157
public void test_schedule_callable() throws Exception {
158158
final CountDownLatch runLatch = new CountDownLatch(1);
159159
final CountDownLatch futureLatch = new CountDownLatch(1);
160160

161161
final CompletableScheduledFuture<String> scheduledFuture = managedExecutorServicePerExtension.schedule(() -> {
162162
runLatch.countDown();
163163
return "test";
164-
}, 50, TimeUnit.MILLISECONDS);
164+
}, 500, TimeUnit.MILLISECONDS);
165165

166166
scheduledFuture.whenComplete((string, throwable) -> {
167167
if (string.equals("test")) {
168168
futureLatch.countDown();
169169
}
170170
});
171171

172-
assertFalse(runLatch.await(25, TimeUnit.MILLISECONDS));
172+
assertFalse(runLatch.await(250, TimeUnit.MILLISECONDS));
173173
assertFalse(futureLatch.await(0, TimeUnit.MILLISECONDS));
174-
assertTrue(runLatch.await(2, TimeUnit.SECONDS));
175-
assertTrue(futureLatch.await(2, TimeUnit.SECONDS));
174+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
175+
assertTrue(futureLatch.await(10, TimeUnit.SECONDS));
176176
}
177177

178-
@Test(timeout = 5000)
178+
@Test(timeout = 10000)
179179
public void test_schedule_callable_cancelled() throws Exception {
180180
final CountDownLatch runLatch = new CountDownLatch(1);
181181
final CountDownLatch exceptionLatch = new CountDownLatch(1);
@@ -198,7 +198,7 @@ public void test_schedule_callable_cancelled() throws Exception {
198198
assertTrue(exceptionLatch.await(2, TimeUnit.SECONDS));
199199
}
200200

201-
@Test(timeout = 5000)
201+
@Test(timeout = 10000)
202202
public void test_schedule_callable_plugin_stopped() throws Exception {
203203
final CountDownLatch runLatch = new CountDownLatch(1);
204204

@@ -227,7 +227,7 @@ public void test_schedule_at_fixed_rate() throws Exception {
227227
final CompletableScheduledFuture<?> scheduledFuture =
228228
managedExecutorServicePerExtension.scheduleAtFixedRate(task, 10, 10, TimeUnit.MILLISECONDS);
229229

230-
assertTrue(runLatch.await(2, TimeUnit.SECONDS));
230+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
231231

232232
final long delay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
233233
assertTrue("bad delay: " + delay, delay <= 10);
@@ -240,7 +240,7 @@ public void test_schedule_at_fixed_rate() throws Exception {
240240
scheduledFuture.cancel(true);
241241

242242
// completes by cancellation
243-
assertTrue(completeLatch.await(1, TimeUnit.SECONDS));
243+
assertTrue(completeLatch.await(10, TimeUnit.SECONDS));
244244
}
245245

246246
@Test
@@ -259,7 +259,7 @@ public void test_schedule_at_fixed_rate_canceled() throws Exception {
259259
final CompletableScheduledFuture<?> scheduledFuture =
260260
managedExecutorServicePerExtension.scheduleAtFixedRate(task, 10, 10, TimeUnit.MILLISECONDS);
261261

262-
assertTrue(runLatch.await(2, TimeUnit.SECONDS));
262+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
263263

264264
final long delay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
265265
assertTrue("bad delay: " + delay, delay <= 100);
@@ -293,7 +293,7 @@ public void test_schedule_with_fixed_delay() throws Exception {
293293
final CompletableScheduledFuture<?> scheduledFuture =
294294
managedExecutorServicePerExtension.scheduleWithFixedDelay(task, 10, 10, TimeUnit.MILLISECONDS);
295295

296-
assertTrue(runLatch.await(2, TimeUnit.SECONDS));
296+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
297297

298298
final long delay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
299299
assertTrue("bad delay: " + delay, delay <= 10);
@@ -308,7 +308,7 @@ public void test_schedule_with_fixed_delay() throws Exception {
308308
scheduledFuture.cancel(true);
309309

310310
// completes by cancellation
311-
assertTrue(completeLatch.await(1, TimeUnit.SECONDS));
311+
assertTrue(completeLatch.await(10, TimeUnit.SECONDS));
312312
}
313313

314314
@Test
@@ -332,7 +332,7 @@ public void test_schedule_with_fixed_delay_exceptional() throws Exception {
332332
final CompletableScheduledFuture<?> scheduledFuture =
333333
managedExecutorServicePerExtension.scheduleWithFixedDelay(task, 10, 10, TimeUnit.MILLISECONDS);
334334

335-
assertTrue(runLatch.await(2, TimeUnit.SECONDS));
335+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
336336

337337
final long delay = scheduledFuture.getDelay(TimeUnit.MILLISECONDS);
338338
assertTrue("bad delay: " + delay, delay <= 10);
@@ -378,7 +378,7 @@ public void test_submit_callable() throws Exception {
378378
}
379379
});
380380

381-
assertTrue(callableLatch.await(1, TimeUnit.SECONDS));
381+
assertTrue(callableLatch.await(10, TimeUnit.SECONDS));
382382
}
383383

384384
@Test
@@ -399,7 +399,7 @@ public void test_submit_callable_exceptional() throws Exception {
399399
}
400400
});
401401

402-
assertTrue(callableLatch.await(1, TimeUnit.SECONDS));
402+
assertTrue(callableLatch.await(10, TimeUnit.SECONDS));
403403

404404
assertTrue(submit.isCompletedExceptionally());
405405

@@ -418,7 +418,7 @@ public void test_submit_runnable() throws Exception {
418418

419419
submit.whenComplete((s, throwable) -> runLatch.countDown());
420420

421-
assertTrue(runLatch.await(1, TimeUnit.SECONDS));
421+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
422422
}
423423

424424
@Test
@@ -437,7 +437,7 @@ public void test_submit_runnable_exceptional() throws Exception {
437437
runLatch.countDown();
438438
});
439439

440-
assertTrue(runLatch.await(1, TimeUnit.SECONDS));
440+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
441441

442442
assertTrue(submit.isCompletedExceptionally());
443443

@@ -456,7 +456,7 @@ public void test_submit_runnable_with_result() throws Exception {
456456

457457
submit.whenComplete((s, throwable) -> s.countDown());
458458

459-
assertTrue(runLatch.await(1, TimeUnit.SECONDS));
459+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
460460
assertTrue(submit.isDone());
461461
assertFalse(submit.isCompletedExceptionally());
462462
}
@@ -474,7 +474,7 @@ public void test_submit_runnable_with_result_plugin_stopped() throws Exception {
474474

475475
submit.whenComplete((s, throwable) -> s.countDown());
476476

477-
assertTrue(runLatch.await(1, TimeUnit.SECONDS));
477+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
478478
assertTrue(submit.isDone());
479479
assertFalse(submit.isCancelled());
480480
}
@@ -498,7 +498,7 @@ public void test_submit_runnable_with_result_exceptional() throws Exception {
498498
}
499499
});
500500

501-
assertTrue(runLatch.await(5, TimeUnit.SECONDS));
501+
assertTrue(runLatch.await(10, TimeUnit.SECONDS));
502502

503503
assertTrue(submit.isDone());
504504
assertTrue(submit.isCompletedExceptionally());
@@ -530,7 +530,7 @@ public void test_invokeAll_callable() throws Exception {
530530
});
531531
}
532532

533-
assertTrue(invokeAllLatch.await(1, TimeUnit.SECONDS));
533+
assertTrue(invokeAllLatch.await(10, TimeUnit.SECONDS));
534534
}
535535

536536
@Test

hivemq-edge/src/test/resources/configs/simulation/simulation-adapter-full-config.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
<topic>my/topic</topic>
1919
<maxQos>1</maxQos>
2020
<tagName>tag</tagName>
21+
<messageHandlingOptions>MQTTMessagePerSubscription</messageHandlingOptions>
2122
<includeTagNames>true</includeTagNames>
2223
<includeTimestamp>false</includeTimestamp>
2324
<mqttUserProperties>
@@ -31,6 +32,7 @@
3132
<topic>my/topic/2</topic>
3233
<maxQos>1</maxQos>
3334
<tagName>tag</tagName>
35+
<messageHandlingOptions>MQTTMessagePerSubscription</messageHandlingOptions>
3436
<includeTagNames>true</includeTagNames>
3537
<includeTimestamp>false</includeTimestamp>
3638
<mqttUserProperties>

0 commit comments

Comments
 (0)