Skip to content

Commit 981f375

Browse files
Connection status to aws metrics
1 parent 5b71bc2 commit 981f375

File tree

11 files changed

+144
-80
lines changed

11 files changed

+144
-80
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<dependency>
2929
<groupId>com.github.DatavenueLiveObjects</groupId>
3030
<artifactId>FIFO_SDK_LiveObjects</artifactId>
31-
<version>v2.0.1</version>
31+
<version>v2.0.2</version>
3232
</dependency>
3333
<dependency>
3434
<groupId>org.springframework.boot</groupId>

src/main/java/com/orange/lo/sample/lo2iothub/ApplicationConfig.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@
1010
import com.fasterxml.jackson.databind.ObjectMapper;
1111
import com.google.common.collect.Lists;
1212
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
13-
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
1413
import com.microsoft.azure.sdk.iot.service.registry.RegistryClient;
1514
import com.microsoft.azure.sdk.iot.service.twin.TwinClient;
1615
import com.orange.lo.sample.lo2iothub.azure.*;
1716
import com.orange.lo.sample.lo2iothub.exceptions.InitializationException;
1817
import com.orange.lo.sample.lo2iothub.lo.LiveObjectsProperties;
1918
import com.orange.lo.sample.lo2iothub.lo.LoAdapter;
2019
import com.orange.lo.sample.lo2iothub.lo.LoCommandSender;
21-
import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
20+
import com.orange.lo.sample.lo2iothub.lo.LoMqttReconnectHandler;
2221
import com.orange.lo.sample.lo2iothub.utils.Counters;
2322
import com.orange.lo.sdk.LOApiClient;
2423
import com.orange.lo.sdk.LOApiClientParameters;
24+
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
2525
import com.orange.lo.sdk.rest.model.Device;
2626
import com.orange.lo.sdk.rest.model.Group;
2727

@@ -57,13 +57,11 @@ public class ApplicationConfig {
5757
private Counters counters;
5858
private ApplicationProperties applicationProperties;
5959
private ObjectMapper objectMapper;
60-
private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
6160

62-
public ApplicationConfig(Counters counterProvider, ApplicationProperties applicationProperties, MappingJackson2HttpMessageConverter springJacksonConverter,
63-
ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
61+
public ApplicationConfig(Counters counterProvider, ApplicationProperties applicationProperties,
62+
MappingJackson2HttpMessageConverter springJacksonConverter) {
6463
this.counters = counterProvider;
6564
this.applicationProperties = applicationProperties;
66-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
6765
this.objectMapper = springJacksonConverter.getObjectMapper();
6866
}
6967

@@ -92,21 +90,20 @@ public Object init() {
9290

9391
IoTDeviceProvider ioTDeviceProvider = createIotDeviceProvider(azureIotHubProperties);
9492

95-
DevicesManager deviceClientManager = new DevicesManager(azureIotHubProperties, connectorHealthActuatorEndpoint, ioTDeviceProvider, counters);
93+
DevicesManager deviceClientManager = new DevicesManager(azureIotHubProperties, ioTDeviceProvider, counters);
9694

9795
IotHubAdapter iotHubAdapter = new IotHubAdapter(
9896
ioTDeviceProvider,
9997
deviceClientManager,
10098
liveObjectsProperties.isDeviceSynchronization(),
101-
connectorHealthActuatorEndpoint
99+
counters
102100
);
103101
MessageHandler dataManagementFifoCallback = new MessageHandler(iotHubAdapter, counters);
102+
DataManagementReconnectCallback reconnectHandler = new LoMqttReconnectHandler(counters);
104103
LOApiClientParameters loApiClientParameters = loApiClientParameters(liveObjectsProperties,
105-
azureIotHubProperties, dataManagementFifoCallback);
104+
azureIotHubProperties, dataManagementFifoCallback, reconnectHandler);
106105
LOApiClient loApiClient = new LOApiClient(loApiClientParameters);
107106

108-
connectorHealthActuatorEndpoint.addLoApiClient(loApiClient);
109-
110107

111108
boolean problemWithConnection = false;
112109
try {
@@ -130,7 +127,8 @@ public Object init() {
130127
} catch (Exception e) {
131128
LOG.error("Problem with connection. Check iot hub credentials", e);
132129
problemWithConnection = true;
133-
connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(null, IotHubConnectionStatus.DISCONNECTED);
130+
// connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(null, IotHubConnectionStatus.DISCONNECTED);
131+
counters.setCloudConnectionStatus(false);
134132
}
135133

136134
if (!problemWithConnection) {
@@ -157,7 +155,9 @@ private IoTDeviceProvider createIotDeviceProvider(AzureIotHubProperties azureIot
157155
}
158156

159157
private LOApiClientParameters loApiClientParameters(LiveObjectsProperties loProperties,
160-
AzureIotHubProperties azureIotHubProperties, MessageHandler dataManagementFifoCallback) {
158+
AzureIotHubProperties azureIotHubProperties,
159+
MessageHandler dataManagementFifoCallback,
160+
DataManagementReconnectCallback reconnectHandler) {
161161

162162
List<String> topics = Lists.newArrayList(azureIotHubProperties.getLoMessagesTopic());
163163
if (loProperties.isDeviceSynchronization()) {
@@ -177,6 +177,7 @@ private LOApiClientParameters loApiClientParameters(LiveObjectsProperties loProp
177177
.dataManagementMqttCallback(dataManagementFifoCallback)
178178
.connectorType(loProperties.getConnectorType())
179179
.connectorVersion(getConnectorVersion())
180+
.dataManagementReconnectCallback(reconnectHandler)
180181
.build();
181182
}
182183

src/main/java/com/orange/lo/sample/lo2iothub/MessageHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
import com.orange.lo.sample.lo2iothub.azure.IotHubAdapter;
1111
import com.orange.lo.sample.lo2iothub.exceptions.DeviceSynchronizationException;
12-
import com.orange.lo.sample.lo2iothub.exceptions.IotDeviceProviderException;
1312
import com.orange.lo.sample.lo2iothub.lo.LoAdapter;
1413
import com.orange.lo.sample.lo2iothub.utils.Counters;
1514
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
1615

1716
import java.lang.invoke.MethodHandles;
18-
import java.util.Optional;
1917

2018
import org.slf4j.Logger;
2119
import org.slf4j.LoggerFactory;

src/main/java/com/orange/lo/sample/lo2iothub/azure/DevicesManager.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
import java.util.concurrent.TimeoutException;
2121
import java.util.stream.Collectors;
2222

23-
import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
2423
import com.orange.lo.sample.lo2iothub.utils.Counters;
25-
import net.jodah.failsafe.Fallback;
26-
import net.jodah.failsafe.RetryPolicy;
2724
import org.slf4j.Logger;
2825
import org.slf4j.LoggerFactory;
2926

@@ -34,14 +31,12 @@ public class DevicesManager {
3431
private List<MultiplexingClientManager> multiplexingClientManagerList;
3532
private LoCommandSender loCommandSender;
3633

37-
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
3834
private final IoTDeviceProvider ioTDeviceProvider;
3935
private Counters counterProvider;
4036
private LoAdapter loAdapter;
4137

42-
public DevicesManager(AzureIotHubProperties azureIotHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint, IoTDeviceProvider ioTDeviceProvider, Counters counterProvider) throws IotHubClientException {
38+
public DevicesManager(AzureIotHubProperties azureIotHubProperties, IoTDeviceProvider ioTDeviceProvider, Counters counterProvider) throws IotHubClientException {
4339
this.azureIotHubProperties = azureIotHubProperties;
44-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
4540
this.ioTDeviceProvider = ioTDeviceProvider;
4641
this.counterProvider = counterProvider;
4742
this.multiplexingClientManagerList = Collections.synchronizedList(new LinkedList<>());
@@ -95,7 +90,7 @@ private synchronized MultiplexingClientManager getFreeMultiplexingClientManager(
9590
return multiplexingClientManager;
9691
}
9792
}
98-
MultiplexingClientManager freeMultiplexingClientManager = new MultiplexingClientManager(azureIotHubProperties, connectorHealthActuatorEndpoint, ioTDeviceProvider);
93+
MultiplexingClientManager freeMultiplexingClientManager = new MultiplexingClientManager(azureIotHubProperties, counterProvider, ioTDeviceProvider);
9994
freeMultiplexingClientManager.makeReservation();
10095
multiplexingClientManagerList.add(freeMultiplexingClientManager);
10196
return freeMultiplexingClientManager;

src/main/java/com/orange/lo/sample/lo2iothub/azure/IotHubAdapter.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88
package com.orange.lo.sample.lo2iothub.azure;
99

1010
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
11-
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
1211
import com.microsoft.azure.sdk.iot.service.registry.Device;
1312
import com.orange.lo.sample.lo2iothub.exceptions.DeviceSynchronizationException;
14-
import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
13+
import com.orange.lo.sample.lo2iothub.utils.Counters;
1514
import org.slf4j.Logger;
1615
import org.slf4j.LoggerFactory;
1716

@@ -27,13 +26,13 @@ public class IotHubAdapter {
2726
private final IoTDeviceProvider ioTDeviceProvider;
2827
private final boolean deviceSynchronization;
2928
private final DevicesManager devicesManager;
30-
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
29+
private final Counters counters;
3130

32-
public IotHubAdapter(IoTDeviceProvider ioTDeviceProvider, DevicesManager deviceClientManager, boolean deviceSynchronization, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
31+
public IotHubAdapter(IoTDeviceProvider ioTDeviceProvider, DevicesManager deviceClientManager, boolean deviceSynchronization, Counters counters) {
3332
this.ioTDeviceProvider = ioTDeviceProvider;
3433
this.devicesManager = deviceClientManager;
3534
this.deviceSynchronization = deviceSynchronization;
36-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
35+
this.counters = counters;
3736
}
3837

3938
public void sendMessage(String loClientId, int loMessageId, String message) {
@@ -65,7 +64,7 @@ public DeviceClientManager createOrGetDeviceClientManager(String deviceId) {
6564
device = ioTDeviceProvider.getDevice(deviceId);
6665
} catch (Exception e) {
6766
LOG.debug("Problem with connection. Check IoT Hub credentials ", e);
68-
connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(null, IotHubConnectionStatus.DISCONNECTED);
67+
counters.setCloudConnectionStatus(false);
6968
}
7069

7170
// no device in iot hub

src/main/java/com/orange/lo/sample/lo2iothub/azure/MultiplexingClientManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
1212
import com.microsoft.azure.sdk.iot.device.exceptions.MultiplexingClientRegistrationException;
1313
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
14-
import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
14+
import com.orange.lo.sample.lo2iothub.utils.Counters;
1515
import net.jodah.failsafe.Failsafe;
1616
import net.jodah.failsafe.RetryPolicy;
1717
import org.slf4j.Logger;
@@ -36,7 +36,7 @@ public class MultiplexingClientManager implements IotHubConnectionStatusChangeCa
3636

3737
private final MultiplexingClient multiplexingClient;
3838
private final Map<String, DeviceClientManager> multiplexedDeviceClientManagers;
39-
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
39+
private final Counters counters;
4040
private final IoTDeviceProvider ioTDeviceProvider;
4141
private final int multiplexingClientId;
4242
private IotHubConnectionStatus multiplexedConnectionStatus;
@@ -46,10 +46,10 @@ public class MultiplexingClientManager implements IotHubConnectionStatusChangeCa
4646
private ScheduledExecutorService registerTaskScheduler = Executors.newScheduledThreadPool(1);
4747
private int reservations;
4848

49-
public MultiplexingClientManager(AzureIotHubProperties azureIotHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint, IoTDeviceProvider ioTDeviceProvider) {
49+
public MultiplexingClientManager(AzureIotHubProperties azureIotHubProperties, Counters counters, IoTDeviceProvider ioTDeviceProvider) {
5050
this.multiplexingClient = new MultiplexingClient(azureIotHubProperties.getIotHostName(), IotHubClientProtocol.AMQPS, null);
5151
this.multiplexedDeviceClientManagers = new HashMap<>();
52-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
52+
this.counters = counters;
5353
this.ioTDeviceProvider = ioTDeviceProvider;
5454
this.multiplexingClientId = multiplexingClientIndex.getAndIncrement();
5555
this.multiplexingClient.setConnectionStatusChangeCallback(this, this);
@@ -67,7 +67,7 @@ public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChange
6767
IotHubConnectionStatusChangeReason statusChangeReason = connectionStatusChangeContext.getNewStatusReason();
6868
Throwable throwable = connectionStatusChangeContext.getCause();
6969

70-
connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(multiplexingClient, multiplexedConnectionStatus);
70+
counters.setCloudConnectionStatus(IotHubConnectionStatus.CONNECTED.equals(multiplexedConnectionStatus));
7171

7272
if (throwable == null) {
7373
LOG.info("Connection status changed for multiplexing client: {}, status: {}, reason: {}", multiplexingClientId, multiplexedConnectionStatus, statusChangeReason);

src/main/java/com/orange/lo/sample/lo2iothub/utils/ConnectorHealthActuatorEndpoint.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,19 @@
77

88
package com.orange.lo.sample.lo2iothub.utils;
99

10-
import com.microsoft.azure.sdk.iot.device.MultiplexingClient;
11-
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
12-
import com.orange.lo.sdk.LOApiClient;
1310
import org.springframework.boot.actuate.health.Health;
1411
import org.springframework.boot.actuate.health.HealthIndicator;
1512
import org.springframework.boot.actuate.health.Status;
1613
import org.springframework.stereotype.Component;
1714

18-
import java.util.HashMap;
19-
import java.util.HashSet;
20-
import java.util.Map;
21-
import java.util.Set;
22-
2315
@Component
2416
public class ConnectorHealthActuatorEndpoint implements HealthIndicator {
2517

26-
private final Set<LOApiClient> loApiClients = new HashSet<>();
27-
private final Map<MultiplexingClient, IotHubConnectionStatus> multiplexingClientStatus = new HashMap<>();
18+
Counters counters;
19+
20+
public ConnectorHealthActuatorEndpoint(Counters counters) {
21+
this.counters = counters;
22+
}
2823

2924
@Override
3025
public Health getHealth(boolean includeDetails) {
@@ -34,21 +29,9 @@ public Health getHealth(boolean includeDetails) {
3429
@Override
3530
public Health health() {
3631
Health.Builder builder = new Health.Builder(Status.UP);
37-
boolean allConnected = loApiClients.stream().allMatch(c -> c.getDataManagementFifo().isConnected());
38-
builder.withDetail("loMqttConnectionStatus", allConnected);
39-
40-
boolean cloudConnectionStatus = multiplexingClientStatus.values()
41-
.stream().allMatch(cs -> cs.equals(IotHubConnectionStatus.CONNECTED));
42-
builder.withDetail("cloudConnectionStatus", cloudConnectionStatus);
4332

33+
builder.withDetail("loMqttConnectionStatus", counters.isLoConnectionStatusUp());
34+
builder.withDetail("cloudConnectionStatus", counters.isCloudConnectionStatusUp());
4435
return builder.build();
4536
}
46-
47-
public void addLoApiClient(LOApiClient loApiClient) {
48-
this.loApiClients.add(loApiClient);
49-
}
50-
51-
public void addMultiplexingConnectionStatus(MultiplexingClient multiplexingClient, IotHubConnectionStatus iotHubConnectionStatus) {
52-
this.multiplexingClientStatus.put(multiplexingClient, iotHubConnectionStatus);
53-
}
5437
}

src/main/java/com/orange/lo/sample/lo2iothub/utils/Counters.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@
99

1010
import java.util.Arrays;
1111
import java.util.List;
12-
import java.util.concurrent.Executors;
12+
import java.util.concurrent.atomic.AtomicInteger;
1313

1414
import io.micrometer.core.instrument.MeterRegistry;
1515
import org.springframework.beans.factory.annotation.Qualifier;
1616
import org.springframework.stereotype.Component;
1717

1818
import io.micrometer.core.instrument.Counter;
19-
import io.micrometer.core.instrument.step.StepMeterRegistry;
2019

2120
@Component
2221
public class Counters {
@@ -26,13 +25,17 @@ public class Counters {
2625
private final Counter mesasageSentAttemptFailedCounter;
2726
private final Counter mesasageSentCounter;
2827
private final Counter mesasageSentFailedCounter;
28+
private AtomicInteger loConnectionStatus;
29+
private AtomicInteger cloudConnectionStatus;
2930

3031
public Counters(@Qualifier("counters") MeterRegistry meterRegistry) {
3132
mesasageReadCounter = meterRegistry.counter("message.read");
3233
mesasageSentAttemptCounter = meterRegistry.counter("message.sent.attempt");
3334
mesasageSentAttemptFailedCounter = meterRegistry.counter("message.sent.attempt.failed");
3435
mesasageSentCounter = meterRegistry.counter("message.sent");
3536
mesasageSentFailedCounter = meterRegistry.counter("message.sent.failed");
37+
loConnectionStatus = meterRegistry.gauge("status.connection.lo", new AtomicInteger(1));
38+
cloudConnectionStatus = meterRegistry.gauge("status.connection.cloud", new AtomicInteger(1));
3639
}
3740

3841
public Counter getMesasageReadCounter() {
@@ -55,6 +58,22 @@ public Counter getMesasageSentFailedCounter() {
5558
return mesasageSentFailedCounter;
5659
}
5760

61+
public void setLoConnectionStatus(boolean status) {
62+
loConnectionStatus.set(status ? 1 : 0);
63+
}
64+
65+
public void setCloudConnectionStatus(boolean status) {
66+
cloudConnectionStatus.set(status ? 1 : 0);
67+
}
68+
69+
public boolean isCloudConnectionStatusUp() {
70+
return cloudConnectionStatus.get() > 0;
71+
}
72+
73+
public boolean isLoConnectionStatusUp() {
74+
return loConnectionStatus.get() > 0;
75+
}
76+
5877
public List<Counter> getAll() {
5978
return Arrays.asList(mesasageReadCounter, mesasageSentAttemptCounter, mesasageSentAttemptFailedCounter, mesasageSentCounter, mesasageSentFailedCounter);
6079
}

0 commit comments

Comments
 (0)