diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/ApplicationConfig.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/ApplicationConfig.java
new file mode 100644
index 0000000..d466b6a
--- /dev/null
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/ApplicationConfig.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright (c) Orange. All Rights Reserved.
+ *
+ * This source code is licensed under the MIT license found in the
+ * LICENSE file in the root directory of this source tree.
+ */
+
+package com.orange.lo.sample.mqtt2eventhub;
+
+import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+@Configuration
+public class ApplicationConfig {
+
+ @Bean
+ public Queue messageQueue() {
+ return new ConcurrentLinkedQueue<>();
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubClientFacade.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubClientFacade.java
index 5c3b575..5edb2a9 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubClientFacade.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubClientFacade.java
@@ -22,6 +22,7 @@
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -59,16 +60,28 @@ private void init() {
}
}
+ public void sendSync(List messages) throws EventHubClientFacadeException {
+ List list = messages.stream().map(msg -> {
+ byte[] bytes = msg.getBytes(Charset.defaultCharset());
+ return EventData.create(bytes);
+ }).toList();
+ sendSync(list);
+ }
+
public void sendSync(String msg) throws EventHubClientFacadeException {
byte[] payloadBytes = msg.getBytes(Charset.defaultCharset());
sendSync(payloadBytes);
}
- public void sendSync(byte[] msg) throws EventHubClientFacadeException {
- EventData sendEvent = EventData.create(msg);
+ public void sendSync(byte[] payloadBytes) throws EventHubClientFacadeException {
+ EventData sendEvent = EventData.create(payloadBytes);
+ List list = List.of(sendEvent);
+ sendSync(list);
+ }
+ public void sendSync(Iterable eventDatas) throws EventHubClientFacadeException {
try {
- eventHubClient.sendSync(sendEvent);
+ eventHubClient.sendSync(eventDatas);
} catch (NullPointerException e) {
throw new EventHubClientFacadeException(EVENT_HUB_CLIENT_NOT_INITIALIZED_MESSAGE);
} catch (EventHubException e) {
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubProperties.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubProperties.java
index f79e94a..65da32e 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubProperties.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubProperties.java
@@ -27,6 +27,7 @@ public class EventHubProperties {
@DurationUnit(ChronoUnit.MILLIS)
private Duration throttlingDelay = Duration.ofMillis(1);
private int maxSendAttempts = 3;
+ private Integer messageBatchSize;
public String getNameSpace() {
return nameSpace;
@@ -99,4 +100,12 @@ public int getMaxSendAttempts() {
public void setMaxSendAttempts(int maxSendAttempts) {
this.maxSendAttempts = maxSendAttempts;
}
+
+ public Integer getMessageBatchSize() {
+ return messageBatchSize;
+ }
+
+ public void setMessageBatchSize(Integer messageBatchSize) {
+ this.messageBatchSize = messageBatchSize;
+ }
}
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java
index ffdce9f..0757c7f 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java
@@ -7,71 +7,78 @@
package com.orange.lo.sample.mqtt2eventhub.evthub;
-import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoProperties;
+import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
+import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoService;
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
-import com.orange.lo.sdk.LOApiClient;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ExecutorService;
@Component
+@EnableScheduling
public class EventHubSender {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final int DEFAULT_BATCH_SIZE = 10;
private final EventHubClientFacade eventHubClientFacade;
private final Counters counters;
private final EventHubProperties eventHubProperties;
- private final LoProperties loProperties;
private final ExecutorService executorService;
- private final LOApiClient loApiClient;
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
+ private final LoService loService;
+ private final Queue messageQueue;
EventHubSender(EventHubClientFacade eventHubClientFacade, Counters counters, EventHubProperties eventHubProperties,
- LoProperties loProperties, ExecutorService executorService, LOApiClient loApiClient,
- ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
+ ExecutorService executorService, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
+ LoService loService, Queue messageQueue) {
this.eventHubClientFacade = eventHubClientFacade;
this.counters = counters;
this.eventHubProperties = eventHubProperties;
- this.loProperties = loProperties;
this.executorService = executorService;
- this.loApiClient = loApiClient;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
+ this.loService = loService;
+ this.messageQueue = messageQueue;
}
- public void send(int loMessageId, String msg) {
-
+ public void send(List messages) {
Failsafe.with(
new RetryPolicy()
.withMaxAttempts(eventHubProperties.getMaxSendAttempts())
.withBackoff(eventHubProperties.getThrottlingDelay().toMillis(), Duration.ofMinutes(1).toMillis(), ChronoUnit.MILLIS)
.onRetry(r -> {
LOG.debug("Problem while sending message to Event Hub because of: {}. Retrying...", r.getLastFailure().getMessage());
- counters.getMesasageSentAttemptFailedCounter().increment();
+ counters.getMesasageSentAttemptFailedCounter().increment(messages.size());
})
.onSuccess(r -> {
- LOG.debug("Message was sent to Event Hub");
- counters.getMesasageSentCounter().increment();
- loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
+ LOG.debug("Batch of messages of the following size were sent: {}", messages.size());
+ counters.getMesasageSentCounter().increment(messages.size());
+ messages.forEach(m -> loService.sendAck(m.messageId()));
})
.onFailure(r -> {
- LOG.error("Cannot send message with id = {} to Event Hub because of {}", loMessageId, r.getFailure());
- counters.getMesasageSentFailedCounter().increment();
- loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
+ LOG.error("Cannot send messages to Event Hub because of {}", r.getFailure());
+ counters.getMesasageSentFailedCounter().increment(messages.size());
+ messages.forEach(m -> loService.sendAck(m.messageId()));
})
).with(executorService).run(execution -> {
- counters.getMesasageSentAttemptCounter().increment();
+ counters.getMesasageSentAttemptCounter().increment(messages.size());
try {
- eventHubClientFacade.sendSync(msg);
+ List messageContentList = messages.stream().map(LoMessage::message).toList();
+ eventHubClientFacade.sendSync(messageContentList);
connectorHealthActuatorEndpoint.setCloudConnectionStatus(true);
} catch (EventHubClientFacadeException e) {
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
@@ -90,4 +97,28 @@ private void checkConnection() {
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
}
}
+
+ @Scheduled(fixedRateString = "${azure.evt-hub.synchronization-interval}")
+ public void send() {
+ LOG.debug("Number of messages waiting to be sent: {}", messageQueue.size());
+ if (!messageQueue.isEmpty()) {
+ LOG.info("Start sending messages...");
+
+ int batchSize = eventHubProperties.getMessageBatchSize() != null
+ ? eventHubProperties.getMessageBatchSize() : DEFAULT_BATCH_SIZE;
+
+ List messageBatch = new ArrayList<>(batchSize);
+ while (!messageQueue.isEmpty()) {
+ messageBatch.add(messageQueue.poll());
+ if (messageBatch.size() == batchSize) {
+ send(new ArrayList<>(messageBatch));
+ messageBatch.clear();
+ }
+ }
+ if (!messageBatch.isEmpty()) {
+ send(new ArrayList<>(messageBatch));
+ }
+ }
+ }
+
}
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfig.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfig.java
index 94fe5d2..17f8927 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfig.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfig.java
@@ -9,6 +9,8 @@
import com.orange.lo.sdk.LOApiClient;
import com.orange.lo.sdk.LOApiClientParameters;
+import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
+import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
@@ -25,12 +27,14 @@
public class LoConfig {
private final LoProperties loProperties;
- private final LoMqttHandler loMqttHandler;
+ private final DataManagementFifoCallback loMqttHandler;
+ private final DataManagementReconnectCallback reconnectHandler;
- public LoConfig(LoProperties loProperties, LoMqttHandler loMqttHandler) {
+ public LoConfig(LoProperties loProperties, DataManagementFifoCallback loMqttHandler, DataManagementReconnectCallback reconnectHandler) {
this.loProperties = loProperties;
this.loMqttHandler = loMqttHandler;
- }
+ this.reconnectHandler = reconnectHandler;
+ }
@Bean
public LOApiClient loApiClient() {
@@ -51,6 +55,7 @@ private LOApiClientParameters loApiClientParameters() {
.dataManagementMqttCallback(loMqttHandler)
.connectorType(loProperties.getConnectorType())
.connectorVersion(getConnectorVersion())
+ .dataManagementReconnectCallback(reconnectHandler)
.build();
}
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMessage.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMessage.java
new file mode 100644
index 0000000..18a8916
--- /dev/null
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMessage.java
@@ -0,0 +1,5 @@
+package com.orange.lo.sample.mqtt2eventhub.liveobjects;
+
+public record LoMessage(int messageId, String message) {
+
+}
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandler.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandler.java
index 323eda3..1a93dca 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandler.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandler.java
@@ -7,43 +7,35 @@
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
-import com.orange.lo.sample.mqtt2eventhub.evthub.EventHubSender;
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
-import com.orange.lo.sdk.LOApiClient;
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
+import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.lang.invoke.MethodHandles;
+import java.util.Queue;
@Component
public class LoMqttHandler implements DataManagementFifoCallback {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final EventHubSender eventHubSender;
- private final Counters counters;
- private final LOApiClient loApiClient;
- private final LoProperties loProperties;
+ private final Counter mesasageReadCounter;
+ private final Queue messageQueue;
- public LoMqttHandler(@Lazy EventHubSender eventHubSender, @Lazy LOApiClient loApiClient, Counters counters, LoProperties loProperties) {
- this.eventHubSender = eventHubSender;
- this.counters = counters;
- this.loApiClient = loApiClient;
- this.loProperties = loProperties;
+ public LoMqttHandler(Counters counterProvider, Queue messageQueue) {
+ LOG.info("LoMqttHandler init...");
+
+ this.mesasageReadCounter = counterProvider.getMesasageReadCounter();
+ this.messageQueue = messageQueue;
}
@Override
- public void onMessage(int loMessageId, String message) {
- counters.getMesasageReadCounter().increment();
- try {
- eventHubSender.send(loMessageId, message);
- } catch (Exception e) {
- LOG.error("Cannot send message with id = {} to Event Hub because of {}", loMessageId, e);
- counters.getMesasageSentFailedCounter().increment();
- loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
- }
+ public void onMessage(int messageId, String message) {
+ LOG.debug("Received message with id = {}", messageId);
+ mesasageReadCounter.increment();
+ messageQueue.add(new LoMessage(messageId, message));
}
}
\ No newline at end of file
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttReconnectHandler.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttReconnectHandler.java
new file mode 100644
index 0000000..174f628
--- /dev/null
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttReconnectHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (c) Orange. All Rights Reserved.
+ *
+ * This source code is licensed under the MIT license found in the
+ * LICENSE file in the root directory of this source tree.
+ */
+
+package com.orange.lo.sample.mqtt2eventhub.liveobjects;
+
+import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
+import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
+import org.springframework.stereotype.Component;
+
+@Component
+public class LoMqttReconnectHandler implements DataManagementReconnectCallback {
+
+ private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
+
+ public LoMqttReconnectHandler(ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
+ this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
+ }
+
+ @Override
+ public void connectComplete(boolean b, String s) {
+ connectorHealthActuatorEndpoint.setLoConnectionStatus(true);
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+ connectorHealthActuatorEndpoint.setLoConnectionStatus(false);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoProperties.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoProperties.java
index a785704..8105b58 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoProperties.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoProperties.java
@@ -15,7 +15,7 @@
public class LoProperties {
private static final String CONNECTOR_TYPE = "LO_AZURE_EVENTHUB_ADAPTER";
-
+
private final String hostname;
private final String apiKey;
private final String topic;
@@ -47,7 +47,7 @@ public LoProperties(
public String getConnectorType() {
return CONNECTOR_TYPE;
}
-
+
public String getHostname() {
return hostname;
}
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoService.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoService.java
index 9c85b5c..0290651 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoService.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoService.java
@@ -25,11 +25,14 @@ public class LoService {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final DataManagementFifo dataManagementFifo;
+ private final LoProperties loProperties;
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
- public LoService(LOApiClient loApiClient, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
+ public LoService(LOApiClient loApiClient, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
+ LoProperties loProperties) {
this.dataManagementFifo = loApiClient.getDataManagementFifo();
+ this.loProperties = loProperties;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
}
@@ -50,4 +53,9 @@ public void start() {
public void stop() {
dataManagementFifo.disconnect();
}
+
+ public void sendAck(int loMessageId) {
+ dataManagementFifo.sendAck(loMessageId, loProperties.getMessageQos());
+ }
+
}
diff --git a/src/main/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpoint.java b/src/main/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpoint.java
index 585c24b..3843ae7 100644
--- a/src/main/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpoint.java
+++ b/src/main/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpoint.java
@@ -1,7 +1,5 @@
package com.orange.lo.sample.mqtt2eventhub.utils;
-import com.orange.lo.sdk.LOApiClient;
-import com.orange.lo.sdk.fifomqtt.DataManagementFifo;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
@@ -10,14 +8,9 @@
@Component
public class ConnectorHealthActuatorEndpoint implements HealthIndicator {
- LOApiClient loApiClient;
boolean cloudConnectionStatus = true;
boolean loConnectionStatus = true;
- public ConnectorHealthActuatorEndpoint(LOApiClient loApiClient) {
- this.loApiClient = loApiClient;
- }
-
@Override
public Health getHealth(boolean includeDetails) {
return HealthIndicator.super.getHealth(includeDetails);
@@ -26,14 +19,12 @@ public Health getHealth(boolean includeDetails) {
@Override
public Health health() {
Health.Builder builder = new Health.Builder(Status.UP);
- DataManagementFifo dataManagementFifo = loApiClient.getDataManagementFifo();
- builder.withDetail("loMqttConnectionStatus", dataManagementFifo.isConnected());
+ builder.withDetail("loMqttConnectionStatus", loConnectionStatus);
builder.withDetail("cloudConnectionStatus", cloudConnectionStatus);
return builder.build();
}
-
public void setCloudConnectionStatus(boolean cloudConnectionStatus) {
this.cloudConnectionStatus = cloudConnectionStatus;
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index e79c4fd..95be4c9 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -19,6 +19,8 @@ azure:
connection-timeout: 5000
throttling-delay: 5ms
max-send-attempts: 3
+ synchronization-interval: 10000
+ message-batch-size: 10
metrics:
send-to-cloudwatch: false
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/ApplicationConfigTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/ApplicationConfigTest.java
new file mode 100644
index 0000000..7f7f824
--- /dev/null
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/ApplicationConfigTest.java
@@ -0,0 +1,27 @@
+package com.orange.lo.sample.mqtt2eventhub;
+
+import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class ApplicationConfigTest {
+
+ private ApplicationConfig applicationConfig;
+
+ @BeforeEach
+ void setUp() {
+ applicationConfig = new ApplicationConfig();
+ }
+
+ @Test
+ void messageQueueShouldBeThreadSafe() {
+ Queue loMessages = applicationConfig.messageQueue();
+
+ assertInstanceOf(ConcurrentLinkedQueue.class, loMessages);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSenderTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSenderTest.java
new file mode 100644
index 0000000..307df27
--- /dev/null
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSenderTest.java
@@ -0,0 +1,144 @@
+package com.orange.lo.sample.mqtt2eventhub.evthub;
+
+import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
+import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoService;
+import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
+import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
+import io.micrometer.core.instrument.Counter;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class EventHubSenderTest {
+ @Mock
+ private EventHubClientFacade eventHubClientFacade;
+ @Mock
+ private Counters counters;
+ @Mock
+ private Counter counter;
+ @Mock
+ private EventHubProperties eventHubProperties;
+ @Mock
+ private ExecutorService executorService;
+ @Mock
+ private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
+ @Mock
+ private LoService loService;
+
+ EventHubSender eventHubSender;
+
+ @BeforeEach
+ void setUp() {
+ prepareService(new LinkedList<>());
+ }
+
+ private void prepareService(LinkedList messageQueue) {
+ eventHubSender = new EventHubSender(eventHubClientFacade, counters, eventHubProperties, executorService,
+ connectorHealthActuatorEndpoint, loService, messageQueue);
+ }
+
+ @Test
+ void shouldNotSendMessagesIfQueueIsEmpty() throws EventHubClientFacadeException {
+ // when
+ eventHubSender.send();
+
+ // then
+ verify(eventHubClientFacade, never()).sendSync(ArgumentMatchers.anyList());
+ }
+
+ @Test
+ void shouldSendMessagesInOneBatchIfQueueNotExceedMessageBatchSizeProperty() throws EventHubClientFacadeException {
+ // given
+ int batchSize = 5;
+
+ when(eventHubProperties.getMessageBatchSize()).thenReturn(batchSize);
+ when(eventHubProperties.getMaxSendAttempts()).thenReturn(10);
+ when(eventHubProperties.getThrottlingDelay()).thenReturn(Duration.ofMillis(1));
+ when(counters.getMesasageSentAttemptCounter()).thenReturn(counter);
+
+ LinkedList messageQueue = getExampleMessageQueue(batchSize);
+
+ prepareService(messageQueue);
+
+ List expectedMessages = messageQueue.stream().map(LoMessage::message).toList();
+
+ // when
+ eventHubSender.send();
+
+ // then
+ verify(eventHubClientFacade, times(1)).sendSync(expectedMessages);
+ }
+
+ @Test
+ void shouldSplitMessagesIntoPacketsIfQueueExceedMessageBatchSizeProperty() throws EventHubClientFacadeException {
+ // given
+ int batchSize = 5;
+ int totalLength = batchSize + 1;
+
+ when(eventHubProperties.getMessageBatchSize()).thenReturn(batchSize);
+ when(eventHubProperties.getMaxSendAttempts()).thenReturn(10);
+ when(eventHubProperties.getThrottlingDelay()).thenReturn(Duration.ofMillis(1));
+ when(counters.getMesasageSentAttemptCounter()).thenReturn(counter);
+
+ LinkedList messageQueue = getExampleMessageQueue(totalLength);
+
+ prepareService(messageQueue);
+
+ List expectedMessages1 = (new LinkedList<>(messageQueue)).subList(0, batchSize)
+ .stream()
+ .map(LoMessage::message)
+ .toList();
+ List expectedMessages2 = (new LinkedList<>(messageQueue)).subList(batchSize, totalLength)
+ .stream()
+ .map(LoMessage::message)
+ .toList();
+
+ // when
+ eventHubSender.send();
+
+ // then
+ verify(eventHubClientFacade, times(1)).sendSync(expectedMessages1);
+ verify(eventHubClientFacade, times(1)).sendSync(expectedMessages2);
+ }
+
+ private LinkedList getExampleMessageQueue(int batchSize) {
+ return IntStream.range(1, batchSize + 1)
+ .mapToObj(i -> new LoMessage(i, String.format("Message %d", i)))
+ .collect(Collectors.toCollection(LinkedList::new));
+ }
+
+ class IsListWithGivenSize implements ArgumentMatcher {
+ int size;
+
+ public IsListWithGivenSize(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public boolean matches(List list) {
+ return list.size() == size;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfigTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfigTest.java
new file mode 100644
index 0000000..08fbe09
--- /dev/null
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfigTest.java
@@ -0,0 +1,55 @@
+package com.orange.lo.sample.mqtt2eventhub.liveobjects;
+
+import com.orange.lo.sdk.LOApiClient;
+import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
+import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith({SpringExtension.class})
+@EnableConfigurationProperties(LoProperties.class)
+@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
+@ActiveProfiles("unittest")
+class LoConfigTest {
+
+ private DataManagementFifoCallback dataManagementFifoCallback = (i, s) -> {
+
+ };
+ private final DataManagementReconnectCallback dataManagementReconnectCallback = new DataManagementReconnectCallback() {
+
+ @Override
+ public void connectComplete(boolean b, String s) {
+
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+
+ }
+ };
+
+ @Autowired
+ private LoProperties loProperties;
+ private LoConfig loConfig;
+
+ @BeforeEach
+ void setUp() {
+ loConfig = new LoConfig(loProperties, dataManagementFifoCallback, dataManagementReconnectCallback);
+ }
+
+ @Test
+ void shouldInitLoApiClient() {
+ LOApiClient loApiClient = loConfig.loApiClient();
+
+ assertNotNull(loApiClient);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandlerTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandlerTest.java
index d35e4df..489abfb 100644
--- a/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandlerTest.java
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandlerTest.java
@@ -19,37 +19,45 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class LoMqttHandlerTest {
- @Mock
- private EventHubSender eventHubSender;
- @Mock
- private LOApiClient loApiClient;
- @Mock
- private LoProperties loProperties;
@Mock
private Counters counters;
@Mock
- private Counter counter;
- private LoMqttHandler loMqttHandler;
+ private Counter receivedCounter;
+ private Queue messageQueue;
+ private LoMqttHandler handler;
@BeforeEach
void setUp() {
- when(counters.getMesasageReadCounter()).thenReturn(counter);
- loMqttHandler = new LoMqttHandler(eventHubSender, loApiClient, counters, loProperties);
+ when(counters.getMesasageReadCounter()).thenReturn(receivedCounter);
+ messageQueue = new LinkedList<>();
+ handler = new LoMqttHandler(counters, messageQueue);
}
@Test
- void shouldCallEvtHubSenderAndCounterWhenMessageIsHandled() {
- String message = "{}";
- int loMessageId = 1;
+ public void shouldIncrementCounterOnMessage() {
+ // when
+ handler.onMessage(1, "test message");
- loMqttHandler.onMessage(loMessageId, message);
+ // then
+ verify(receivedCounter, times(1)).increment();
+ }
+
+ @Test
+ public void shouldAddMessageToQueueOnMessage() {
+ // when
+ handler.onMessage(1, "test message");
- verify(counter, times(1)).increment();
- verify(eventHubSender, times(1)).send(loMessageId, message);
+ // then
+ assertEquals(1, messageQueue.size());
+ assertEquals(new LoMessage(1, "test message"), messageQueue.peek());
}
}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttReconnectHandlerTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttReconnectHandlerTest.java
new file mode 100644
index 0000000..60de095
--- /dev/null
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttReconnectHandlerTest.java
@@ -0,0 +1,38 @@
+package com.orange.lo.sample.mqtt2eventhub.liveobjects;
+
+import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+class LoMqttReconnectHandlerTest {
+
+ @Mock
+ private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
+ private LoMqttReconnectHandler loMqttReconnectHandler;
+
+ @BeforeEach
+ void setUp() {
+ this.loMqttReconnectHandler = new LoMqttReconnectHandler(connectorHealthActuatorEndpoint);
+ }
+
+ @Test
+ void shouldChangeLoConnectionStausWhenConnectComplete() {
+ loMqttReconnectHandler.connectComplete(false, "");
+
+ verify(connectorHealthActuatorEndpoint, times(1)).setLoConnectionStatus(true);
+ }
+
+ @Test
+ void shouldChangeLoConnectionStausWhenConnectionLost() {
+ loMqttReconnectHandler.connectionLost(new Exception());
+
+ verify(connectorHealthActuatorEndpoint, times(1)).setLoConnectionStatus(false);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoPropertiesTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoPropertiesTest.java
new file mode 100644
index 0000000..c6365f5
--- /dev/null
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoPropertiesTest.java
@@ -0,0 +1,37 @@
+package com.orange.lo.sample.mqtt2eventhub.liveobjects;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@ExtendWith({SpringExtension.class})
+@EnableConfigurationProperties(LoProperties.class)
+@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
+@ActiveProfiles("unittest")
+class LoPropertiesTest {
+
+ @Autowired
+ private LoProperties loProperties;
+
+ @Test
+ void shouldBindLoPropertiesFromYamlConfiguration() {
+ Assertions.assertAll(
+ () -> assertEquals("invalid.url.liveobjects.orange-business.com", this.loProperties.getHostname()),
+ () -> assertEquals("test", this.loProperties.getApiKey()),
+ () -> assertEquals("dev", this.loProperties.getTopic()),
+ () -> assertEquals(30, this.loProperties.getKeepAliveIntervalSeconds()),
+ () -> assertEquals(true, this.loProperties.getAutomaticReconnect()),
+ () -> assertEquals(1, this.loProperties.getMessageQos()),
+ () -> assertEquals("./temp/", this.loProperties.getMqttPersistenceDir()),
+ () -> assertEquals(30000, this.loProperties.getConnectionTimeout())
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoServiceTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoServiceTest.java
index a0bdd7f..fddce65 100644
--- a/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoServiceTest.java
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoServiceTest.java
@@ -26,6 +26,8 @@ class LoServiceTest {
@Mock
private LOApiClient loApiClient;
@Mock
+ private LoProperties loProperties;
+ @Mock
private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
@BeforeEach
@@ -38,7 +40,7 @@ void startShouldConnectAndSubscribe() {
when(connectorHealthActuatorEndpoint.isLoConnectionStatus()).thenReturn(true);
when(connectorHealthActuatorEndpoint.isCloudConnectionStatus()).thenReturn(true);
- LoService loService = new LoService(loApiClient, connectorHealthActuatorEndpoint);
+ LoService loService = new LoService(loApiClient, connectorHealthActuatorEndpoint, loProperties);
loService.start();
verify(dataManagementFifo, times(1)).connectAndSubscribe();
@@ -46,9 +48,17 @@ void startShouldConnectAndSubscribe() {
@Test
void shouldInvokeDisconnect() {
- LoService loService = new LoService(loApiClient, connectorHealthActuatorEndpoint);
+ LoService loService = new LoService(loApiClient, connectorHealthActuatorEndpoint, loProperties);
loService.stop();
verify(dataManagementFifo, times(1)).disconnect();
}
+ @Test
+ void shouldSendAck() {
+ LoService loService = new LoService(loApiClient, connectorHealthActuatorEndpoint, loProperties);
+ int messageId = 12345;
+ loService.sendAck(messageId);
+
+ verify(dataManagementFifo, times(1)).sendAck(messageId, 0);
+ }
}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpointTest.java b/src/test/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpointTest.java
index 6e3e41c..36aeffd 100644
--- a/src/test/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpointTest.java
+++ b/src/test/java/com/orange/lo/sample/mqtt2eventhub/utils/ConnectorHealthActuatorEndpointTest.java
@@ -1,13 +1,12 @@
package com.orange.lo.sample.mqtt2eventhub.utils;
-import com.orange.lo.sdk.LOApiClient;
-import com.orange.lo.sdk.fifomqtt.DataManagementFifo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.Mockito;
+import org.springframework.boot.actuate.health.Health;
+import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -19,10 +18,7 @@ class ConnectorHealthActuatorEndpointTest {
@BeforeEach
void setUp() {
- DataManagementFifo dataManagementFifo = Mockito.mock(DataManagementFifo.class);
- LOApiClient loApiClient = Mockito.mock(LOApiClient.class);
- Mockito.when(loApiClient.getDataManagementFifo()).thenReturn(dataManagementFifo);
- this.connectorHealthActuatorEndpoint = new ConnectorHealthActuatorEndpoint(loApiClient);
+ this.connectorHealthActuatorEndpoint = new ConnectorHealthActuatorEndpoint();
}
@ParameterizedTest
@@ -34,7 +30,33 @@ void checkCloudConnectionStatus(boolean isConnected) {
.get("cloudConnectionStatus");
// then
- assertEquals(cloudConnectionStatus, isConnected);
+ assertEquals(isConnected, cloudConnectionStatus);
+ assertEquals(isConnected, connectorHealthActuatorEndpoint.isCloudConnectionStatus());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideTestData")
+ void checkLoConnectionStatus(boolean isConnected) {
+ // when
+ connectorHealthActuatorEndpoint.setLoConnectionStatus(isConnected);
+ boolean loMqttConnectionStatus = (boolean) connectorHealthActuatorEndpoint.health().getDetails()
+ .get("loMqttConnectionStatus");
+
+ // then
+ assertEquals(isConnected, loMqttConnectionStatus);
+ assertEquals(isConnected, connectorHealthActuatorEndpoint.isLoConnectionStatus());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideTestData")
+ void checkHealth(boolean includeDetails) {
+ // when
+ Health health = connectorHealthActuatorEndpoint.getHealth(includeDetails);
+ Map details = health.getDetails();
+ int expectedDetailsSize = includeDetails ? 2 : 0;
+
+ // then
+ assertEquals(expectedDetailsSize, details.size());
}
private static Stream provideTestData() {
diff --git a/src/test/resources/application.yml b/src/test/resources/application-unittest.yml
similarity index 75%
rename from src/test/resources/application.yml
rename to src/test/resources/application-unittest.yml
index 3f24948..c6a82ca 100644
--- a/src/test/resources/application.yml
+++ b/src/test/resources/application-unittest.yml
@@ -5,20 +5,22 @@ lo:
keep-alive-interval-seconds: 30
automatic-reconnect: true
message-qos: 1
- mqtt-persistence-dir: ${basedir:.}/temp/
+ mqtt-persistence-dir: ./temp/
connection-timeout: 30000
azure:
evt-hub:
- name-space:
- evt-hub-name:
+ name-space: eventhub-name-space
+ evt-hub-name: eventhub-name
sas-key-name: RootManageSharedAccessKey
- sas-key:
+ sas-key: EX4MPLE5ASK3Y
thread-pool-size: 40
task-queue-size: 150000
connection-timeout: 5000
throttling-delay: 5000
max-send-attempts: 3
+ synchronization-interval: 10000
+ message-batch-size: 10
metrics:
namespace: CCS