multiplexedDeviceClientManagers;
- private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
+ private final Counters counters;
private final IoTDeviceProvider ioTDeviceProvider;
private final int multiplexingClientId;
private IotHubConnectionStatus multiplexedConnectionStatus;
@@ -46,10 +46,10 @@ public class MultiplexingClientManager implements IotHubConnectionStatusChangeCa
private ScheduledExecutorService registerTaskScheduler = Executors.newScheduledThreadPool(1);
private int reservations;
- public MultiplexingClientManager(AzureIotHubProperties azureIotHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint, IoTDeviceProvider ioTDeviceProvider) {
+ public MultiplexingClientManager(AzureIotHubProperties azureIotHubProperties, Counters counters, IoTDeviceProvider ioTDeviceProvider) {
this.multiplexingClient = new MultiplexingClient(azureIotHubProperties.getIotHostName(), IotHubClientProtocol.AMQPS, null);
this.multiplexedDeviceClientManagers = new HashMap<>();
- this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
+ this.counters = counters;
this.ioTDeviceProvider = ioTDeviceProvider;
this.multiplexingClientId = multiplexingClientIndex.getAndIncrement();
this.multiplexingClient.setConnectionStatusChangeCallback(this, this);
@@ -67,7 +67,7 @@ public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChange
IotHubConnectionStatusChangeReason statusChangeReason = connectionStatusChangeContext.getNewStatusReason();
Throwable throwable = connectionStatusChangeContext.getCause();
- connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(multiplexingClient, multiplexedConnectionStatus);
+ counters.setCloudConnectionStatus(IotHubConnectionStatus.CONNECTED.equals(multiplexedConnectionStatus));
if (throwable == null) {
LOG.info("Connection status changed for multiplexing client: {}, status: {}, reason: {}", multiplexingClientId, multiplexedConnectionStatus, statusChangeReason);
diff --git a/src/main/java/com/orange/lo/sample/lo2iothub/lo/LoMqttReconnectHandler.java b/src/main/java/com/orange/lo/sample/lo2iothub/lo/LoMqttReconnectHandler.java
new file mode 100644
index 0000000..27b51cd
--- /dev/null
+++ b/src/main/java/com/orange/lo/sample/lo2iothub/lo/LoMqttReconnectHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright (c) Orange. All Rights Reserved.
+ *
+ * This source code is licensed under the MIT license found in the
+ * LICENSE file in the root directory of this source tree.
+ */
+
+package com.orange.lo.sample.lo2iothub.lo;
+
+import com.orange.lo.sample.lo2iothub.utils.Counters;
+import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
+
+public class LoMqttReconnectHandler implements DataManagementReconnectCallback {
+
+ private final Counters counters;
+
+ public LoMqttReconnectHandler(Counters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public void connectComplete(boolean b, String s) {
+ counters.setLoConnectionStatus(true);
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+ counters.setLoConnectionStatus(false);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpoint.java b/src/main/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpoint.java
index dbe5d57..9a6e995 100644
--- a/src/main/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpoint.java
+++ b/src/main/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpoint.java
@@ -7,24 +7,19 @@
package com.orange.lo.sample.lo2iothub.utils;
-import com.microsoft.azure.sdk.iot.device.MultiplexingClient;
-import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
-import com.orange.lo.sdk.LOApiClient;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.stereotype.Component;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
@Component
public class ConnectorHealthActuatorEndpoint implements HealthIndicator {
- private final Set loApiClients = new HashSet<>();
- private final Map multiplexingClientStatus = new HashMap<>();
+ Counters counters;
+
+ public ConnectorHealthActuatorEndpoint(Counters counters) {
+ this.counters = counters;
+ }
@Override
public Health getHealth(boolean includeDetails) {
@@ -34,21 +29,9 @@ public Health getHealth(boolean includeDetails) {
@Override
public Health health() {
Health.Builder builder = new Health.Builder(Status.UP);
- boolean allConnected = loApiClients.stream().allMatch(c -> c.getDataManagementFifo().isConnected());
- builder.withDetail("loMqttConnectionStatus", allConnected);
-
- boolean cloudConnectionStatus = multiplexingClientStatus.values()
- .stream().allMatch(cs -> cs.equals(IotHubConnectionStatus.CONNECTED));
- builder.withDetail("cloudConnectionStatus", cloudConnectionStatus);
+ builder.withDetail("loMqttConnectionStatus", counters.isLoConnectionStatusUp());
+ builder.withDetail("cloudConnectionStatus", counters.isCloudConnectionStatusUp());
return builder.build();
}
-
- public void addLoApiClient(LOApiClient loApiClient) {
- this.loApiClients.add(loApiClient);
- }
-
- public void addMultiplexingConnectionStatus(MultiplexingClient multiplexingClient, IotHubConnectionStatus iotHubConnectionStatus) {
- this.multiplexingClientStatus.put(multiplexingClient, iotHubConnectionStatus);
- }
}
\ No newline at end of file
diff --git a/src/main/java/com/orange/lo/sample/lo2iothub/utils/Counters.java b/src/main/java/com/orange/lo/sample/lo2iothub/utils/Counters.java
index ec584c7..2656bfb 100644
--- a/src/main/java/com/orange/lo/sample/lo2iothub/utils/Counters.java
+++ b/src/main/java/com/orange/lo/sample/lo2iothub/utils/Counters.java
@@ -9,14 +9,13 @@
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.micrometer.core.instrument.Counter;
-import io.micrometer.core.instrument.step.StepMeterRegistry;
@Component
public class Counters {
@@ -26,6 +25,8 @@ public class Counters {
private final Counter mesasageSentAttemptFailedCounter;
private final Counter mesasageSentCounter;
private final Counter mesasageSentFailedCounter;
+ private AtomicInteger loConnectionStatus;
+ private AtomicInteger cloudConnectionStatus;
public Counters(@Qualifier("counters") MeterRegistry meterRegistry) {
mesasageReadCounter = meterRegistry.counter("message.read");
@@ -33,6 +34,8 @@ public Counters(@Qualifier("counters") MeterRegistry meterRegistry) {
mesasageSentAttemptFailedCounter = meterRegistry.counter("message.sent.attempt.failed");
mesasageSentCounter = meterRegistry.counter("message.sent");
mesasageSentFailedCounter = meterRegistry.counter("message.sent.failed");
+ loConnectionStatus = meterRegistry.gauge("status.connection.lo", new AtomicInteger(1));
+ cloudConnectionStatus = meterRegistry.gauge("status.connection.cloud", new AtomicInteger(1));
}
public Counter getMesasageReadCounter() {
@@ -55,6 +58,22 @@ public Counter getMesasageSentFailedCounter() {
return mesasageSentFailedCounter;
}
+ public void setLoConnectionStatus(boolean status) {
+ loConnectionStatus.set(status ? 1 : 0);
+ }
+
+ public void setCloudConnectionStatus(boolean status) {
+ cloudConnectionStatus.set(status ? 1 : 0);
+ }
+
+ public boolean isCloudConnectionStatusUp() {
+ return cloudConnectionStatus.get() > 0;
+ }
+
+ public boolean isLoConnectionStatusUp() {
+ return loConnectionStatus.get() > 0;
+ }
+
public List getAll() {
return Arrays.asList(mesasageReadCounter, mesasageSentAttemptCounter, mesasageSentAttemptFailedCounter, mesasageSentCounter, mesasageSentFailedCounter);
}
diff --git a/src/main/java/com/orange/lo/sample/lo2iothub/utils/MeterRegistryConfig.java b/src/main/java/com/orange/lo/sample/lo2iothub/utils/MeterRegistryConfig.java
index 7f7fb02..555ca0c 100644
--- a/src/main/java/com/orange/lo/sample/lo2iothub/utils/MeterRegistryConfig.java
+++ b/src/main/java/com/orange/lo/sample/lo2iothub/utils/MeterRegistryConfig.java
@@ -11,6 +11,8 @@
import io.micrometer.cloudwatch2.CloudWatchMeterRegistry;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.step.StepMeterRegistry;
@@ -30,15 +32,19 @@
import java.lang.invoke.MethodHandles;
import java.time.Duration;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
@Configuration
public class MeterRegistryConfig {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String AWS_SERVICE_PROFILE_NAME = "service-profile";
+ public static final String MESSAGE_METRICS_PREFIX = "message";
+ public static final String STATUS_METRICS_PREFIX = "status";
private final MetricsProperties metricsProperties;
public MeterRegistryConfig(MetricsProperties metricsProperties) {
@@ -66,7 +72,7 @@ private CloudWatchMeterRegistry getCloudWatchMeterRegistry() {
CloudWatchMeterRegistry cloudWatchMeterRegistry = new CloudWatchMeterRegistry(cloudWatchConfig(), Clock.SYSTEM, cloudWatchAsyncClient);
cloudWatchMeterRegistry.config()
- .meterFilter(MeterFilter.deny(id -> !id.getName().startsWith("message")))
+ .meterFilter(MeterFilter.deny(id -> !isAConnectorMetric(id)))
.commonTags(metricsProperties.getDimensionName(), metricsProperties.getDimensionValue());
return cloudWatchMeterRegistry;
}
@@ -98,6 +104,11 @@ public String namespace() {
};
}
+ private static boolean isAConnectorMetric(Meter.Id id) {
+ String name = id.getName();
+ return name.startsWith(MESSAGE_METRICS_PREFIX) || name.startsWith(STATUS_METRICS_PREFIX);
+ }
+
private StepMeterRegistry stepMeterRegistry() {
StepMeterRegistry stepMeterRegistry = new StepMeterRegistry(stepRegistryConfig(), Clock.SYSTEM) {
@@ -108,10 +119,22 @@ protected TimeUnit getBaseTimeUnit() {
@Override
protected void publish() {
- getMeters().stream()
- .filter(m -> m.getId().getName().startsWith("message"))
- .map(m -> get(m.getId().getName()).counter())
- .forEach(c -> LOG.info("{} = {}", c.getId().getName(), val(c)));
+ Stream message = getMeters().stream()
+ .filter(m -> m.getId().getName().startsWith(MESSAGE_METRICS_PREFIX))
+ .map(m -> {
+ Counter counter = get(m.getId().getName()).counter();
+ return counter.getId().getName() + " = " + val(counter);
+ });
+
+ Stream status = getMeters().stream()
+ .filter(m -> m.getId().getName().startsWith(STATUS_METRICS_PREFIX))
+ .map(m -> {
+ Gauge gauge = get(m.getId().getName()).gauge();
+ return gauge.getId().getName() + " = " + Math.round(gauge.value());
+ });
+
+ List collect = Stream.concat(message, status).toList();
+ collect.forEach(LOG::info);
}
@Override
diff --git a/src/test/java/com/orange/lo/sample/lo2iothub/ApplicationConfigTest.java b/src/test/java/com/orange/lo/sample/lo2iothub/ApplicationConfigTest.java
index f66d4bd..a5d2b66 100644
--- a/src/test/java/com/orange/lo/sample/lo2iothub/ApplicationConfigTest.java
+++ b/src/test/java/com/orange/lo/sample/lo2iothub/ApplicationConfigTest.java
@@ -36,7 +36,7 @@ class ApplicationConfigTest {
@BeforeEach
void setUp() {
- applicationConfig = new ApplicationConfig(counterProvider, applicationProperties, springJacksonConverter, connectorHealthActuatorEndpoint);
+ applicationConfig = new ApplicationConfig(counterProvider, applicationProperties, springJacksonConverter);
}
@Test
diff --git a/src/test/java/com/orange/lo/sample/lo2iothub/lo/LoMqttReconnectHandlerTest.java b/src/test/java/com/orange/lo/sample/lo2iothub/lo/LoMqttReconnectHandlerTest.java
new file mode 100644
index 0000000..01d048d
--- /dev/null
+++ b/src/test/java/com/orange/lo/sample/lo2iothub/lo/LoMqttReconnectHandlerTest.java
@@ -0,0 +1,60 @@
+package com.orange.lo.sample.lo2iothub.lo;
+
+import com.orange.lo.sample.lo2iothub.utils.Counters;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.step.StepMeterRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class LoMqttReconnectHandlerTest {
+
+ private Counters counters;
+ private LoMqttReconnectHandler loMqttReconnectHandler;
+
+ @BeforeEach
+ void setUp() {
+ MeterRegistry meterRegistry = mockMeterRegistry();
+ this.counters = new Counters(meterRegistry);
+ this.loMqttReconnectHandler = new LoMqttReconnectHandler(counters);
+ }
+
+ @Test
+ void shouldChangeLoConnectionStausWhenConnectComplete() {
+ loMqttReconnectHandler.connectComplete(false, "");
+
+ assertTrue(counters.isLoConnectionStatusUp());
+ }
+
+ @Test
+ void shouldChangeLoConnectionStausWhenConnectionLost() {
+ loMqttReconnectHandler.connectionLost(new Exception());
+
+ assertFalse(counters.isLoConnectionStatusUp());
+ }
+
+ private MeterRegistry mockMeterRegistry() {
+ StepMeterRegistry meterRegistry = mock(StepMeterRegistry.class);
+ when(meterRegistry.counter("message.read")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent.attempt")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent.attempt.failed")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent.failed")).thenReturn(mock(Counter.class));
+ when(meterRegistry.gauge(eq("status.connection.lo"), any())).thenReturn(new AtomicInteger());
+ when(meterRegistry.gauge(eq("status.connection.cloud"), any())).thenReturn(new AtomicInteger());
+
+ return meterRegistry;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpointTest.java b/src/test/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpointTest.java
index 2d1edf1..deea5cc 100644
--- a/src/test/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpointTest.java
+++ b/src/test/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpointTest.java
@@ -7,46 +7,92 @@
package com.orange.lo.sample.lo2iothub.utils;
-import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
-import com.microsoft.azure.sdk.iot.device.MultiplexingClient;
-import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.step.StepMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.actuate.health.Health;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@ExtendWith(MockitoExtension.class)
class ConnectorHealthActuatorEndpointTest {
+ private Counters counters;
private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
+
@BeforeEach
void setUp() {
- this.connectorHealthActuatorEndpoint = new ConnectorHealthActuatorEndpoint();
+ MeterRegistry meterRegistry = mockMeterRegistry();
+ this.counters = new Counters(meterRegistry);
+ this.connectorHealthActuatorEndpoint = new ConnectorHealthActuatorEndpoint(counters);
}
@ParameterizedTest
@MethodSource("provideTestData")
- void checkConnectionStatus(
- MultiplexingClient multiplexingClient, IotHubConnectionStatus iotHubConnectionStatus, boolean isConnected) {
+ void checkCloudConnectionStatus(boolean isConnected) {
// when
- connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(multiplexingClient, iotHubConnectionStatus);
+ counters.setCloudConnectionStatus(isConnected);
boolean cloudConnectionStatus = (boolean) connectorHealthActuatorEndpoint.health().getDetails()
.get("cloudConnectionStatus");
// then
- assertEquals(cloudConnectionStatus, isConnected);
+ assertEquals(isConnected, cloudConnectionStatus);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideTestData")
+ void checkLoConnectionStatus(boolean isConnected) {
+ // when
+ counters.setLoConnectionStatus(isConnected);
+ boolean loMqttConnectionStatus = (boolean) connectorHealthActuatorEndpoint.health().getDetails()
+ .get("loMqttConnectionStatus");
+
+ // then
+ assertEquals(isConnected, loMqttConnectionStatus);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideTestData")
+ void checkHealth(boolean includeDetails) {
+ // when
+ Health health = connectorHealthActuatorEndpoint.getHealth(includeDetails);
+ Map details = health.getDetails();
+ int expectedDetailsSize = includeDetails ? 2 : 0;
+
+ // then
+ assertEquals(expectedDetailsSize, details.size());
}
private static Stream provideTestData() {
- return Stream.of(Arguments.of(new MultiplexingClient("liveobjects.orange-business.com", IotHubClientProtocol.AMQPS),
- IotHubConnectionStatus.CONNECTED, true),
- Arguments.of(new MultiplexingClient("liveobjects.orange-business.com", IotHubClientProtocol.AMQPS),
- IotHubConnectionStatus.DISCONNECTED, false),
- Arguments.of(new MultiplexingClient("liveobjects.orange-business.com", IotHubClientProtocol.AMQPS),
- IotHubConnectionStatus.DISCONNECTED_RETRYING, false));
+ return Stream.of(Arguments.of(true),
+ Arguments.of(false));
+ }
+
+ private MeterRegistry mockMeterRegistry() {
+ StepMeterRegistry meterRegistry = mock(StepMeterRegistry.class);
+ when(meterRegistry.counter("message.read")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent.attempt")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent.attempt.failed")).thenReturn(mock(Counter.class));
+ when(meterRegistry.counter("message.sent.failed")).thenReturn(mock(Counter.class));
+ when(meterRegistry.gauge(eq("status.connection.lo"), any())).thenReturn(new AtomicInteger());
+ when(meterRegistry.gauge(eq("status.connection.cloud"), any())).thenReturn(new AtomicInteger());
+
+ return meterRegistry;
}
}
\ No newline at end of file