Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<dependency>
<groupId>com.github.DatavenueLiveObjects</groupId>
<artifactId>FIFO_SDK_LiveObjects</artifactId>
<version>v2.0.1</version>
<version>v2.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
27 changes: 14 additions & 13 deletions src/main/java/com/orange/lo/sample/lo2iothub/ApplicationConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.service.registry.RegistryClient;
import com.microsoft.azure.sdk.iot.service.twin.TwinClient;
import com.orange.lo.sample.lo2iothub.azure.*;
import com.orange.lo.sample.lo2iothub.exceptions.InitializationException;
import com.orange.lo.sample.lo2iothub.lo.LiveObjectsProperties;
import com.orange.lo.sample.lo2iothub.lo.LoAdapter;
import com.orange.lo.sample.lo2iothub.lo.LoCommandSender;
import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.lo2iothub.lo.LoMqttReconnectHandler;
import com.orange.lo.sample.lo2iothub.utils.Counters;
import com.orange.lo.sdk.LOApiClient;
import com.orange.lo.sdk.LOApiClientParameters;
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
import com.orange.lo.sdk.rest.model.Device;
import com.orange.lo.sdk.rest.model.Group;

Expand Down Expand Up @@ -57,13 +57,11 @@ public class ApplicationConfig {
private Counters counters;
private ApplicationProperties applicationProperties;
private ObjectMapper objectMapper;
private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;

public ApplicationConfig(Counters counterProvider, ApplicationProperties applicationProperties, MappingJackson2HttpMessageConverter springJacksonConverter,
ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
public ApplicationConfig(Counters counterProvider, ApplicationProperties applicationProperties,
MappingJackson2HttpMessageConverter springJacksonConverter) {
this.counters = counterProvider;
this.applicationProperties = applicationProperties;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
this.objectMapper = springJacksonConverter.getObjectMapper();
}

Expand Down Expand Up @@ -92,21 +90,20 @@ public Object init() {

IoTDeviceProvider ioTDeviceProvider = createIotDeviceProvider(azureIotHubProperties);

DevicesManager deviceClientManager = new DevicesManager(azureIotHubProperties, connectorHealthActuatorEndpoint, ioTDeviceProvider, counters);
DevicesManager deviceClientManager = new DevicesManager(azureIotHubProperties, ioTDeviceProvider, counters);

IotHubAdapter iotHubAdapter = new IotHubAdapter(
ioTDeviceProvider,
deviceClientManager,
liveObjectsProperties.isDeviceSynchronization(),
connectorHealthActuatorEndpoint
counters
);
MessageHandler dataManagementFifoCallback = new MessageHandler(iotHubAdapter, counters);
DataManagementReconnectCallback reconnectHandler = new LoMqttReconnectHandler(counters);
LOApiClientParameters loApiClientParameters = loApiClientParameters(liveObjectsProperties,
azureIotHubProperties, dataManagementFifoCallback);
azureIotHubProperties, dataManagementFifoCallback, reconnectHandler);
LOApiClient loApiClient = new LOApiClient(loApiClientParameters);

connectorHealthActuatorEndpoint.addLoApiClient(loApiClient);


boolean problemWithConnection = false;
try {
Expand All @@ -130,7 +127,8 @@ public Object init() {
} catch (Exception e) {
LOG.error("Problem with connection. Check iot hub credentials", e);
problemWithConnection = true;
connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(null, IotHubConnectionStatus.DISCONNECTED);
// connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(null, IotHubConnectionStatus.DISCONNECTED);
counters.setCloudConnectionStatus(false);
}

if (!problemWithConnection) {
Expand All @@ -157,7 +155,9 @@ private IoTDeviceProvider createIotDeviceProvider(AzureIotHubProperties azureIot
}

private LOApiClientParameters loApiClientParameters(LiveObjectsProperties loProperties,
AzureIotHubProperties azureIotHubProperties, MessageHandler dataManagementFifoCallback) {
AzureIotHubProperties azureIotHubProperties,
MessageHandler dataManagementFifoCallback,
DataManagementReconnectCallback reconnectHandler) {

List<String> topics = Lists.newArrayList(azureIotHubProperties.getLoMessagesTopic());
if (loProperties.isDeviceSynchronization()) {
Expand All @@ -177,6 +177,7 @@ private LOApiClientParameters loApiClientParameters(LiveObjectsProperties loProp
.dataManagementMqttCallback(dataManagementFifoCallback)
.connectorType(loProperties.getConnectorType())
.connectorVersion(getConnectorVersion())
.dataManagementReconnectCallback(reconnectHandler)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

import com.orange.lo.sample.lo2iothub.azure.IotHubAdapter;
import com.orange.lo.sample.lo2iothub.exceptions.DeviceSynchronizationException;
import com.orange.lo.sample.lo2iothub.exceptions.IotDeviceProviderException;
import com.orange.lo.sample.lo2iothub.lo.LoAdapter;
import com.orange.lo.sample.lo2iothub.utils.Counters;
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;

import java.lang.invoke.MethodHandles;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.lo2iothub.utils.Counters;
import net.jodah.failsafe.Fallback;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,14 +31,12 @@ public class DevicesManager {
private List<MultiplexingClientManager> multiplexingClientManagerList;
private LoCommandSender loCommandSender;

private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
private final IoTDeviceProvider ioTDeviceProvider;
private Counters counterProvider;
private LoAdapter loAdapter;

public DevicesManager(AzureIotHubProperties azureIotHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint, IoTDeviceProvider ioTDeviceProvider, Counters counterProvider) throws IotHubClientException {
public DevicesManager(AzureIotHubProperties azureIotHubProperties, IoTDeviceProvider ioTDeviceProvider, Counters counterProvider) throws IotHubClientException {
this.azureIotHubProperties = azureIotHubProperties;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
this.ioTDeviceProvider = ioTDeviceProvider;
this.counterProvider = counterProvider;
this.multiplexingClientManagerList = Collections.synchronizedList(new LinkedList<>());
Expand Down Expand Up @@ -95,7 +90,7 @@ private synchronized MultiplexingClientManager getFreeMultiplexingClientManager(
return multiplexingClientManager;
}
}
MultiplexingClientManager freeMultiplexingClientManager = new MultiplexingClientManager(azureIotHubProperties, connectorHealthActuatorEndpoint, ioTDeviceProvider);
MultiplexingClientManager freeMultiplexingClientManager = new MultiplexingClientManager(azureIotHubProperties, counterProvider, ioTDeviceProvider);
freeMultiplexingClientManager.makeReservation();
multiplexingClientManagerList.add(freeMultiplexingClientManager);
return freeMultiplexingClientManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
package com.orange.lo.sample.lo2iothub.azure;

import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.microsoft.azure.sdk.iot.service.registry.Device;
import com.orange.lo.sample.lo2iothub.exceptions.DeviceSynchronizationException;
import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.lo2iothub.utils.Counters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,13 +26,13 @@ public class IotHubAdapter {
private final IoTDeviceProvider ioTDeviceProvider;
private final boolean deviceSynchronization;
private final DevicesManager devicesManager;
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
private final Counters counters;

public IotHubAdapter(IoTDeviceProvider ioTDeviceProvider, DevicesManager deviceClientManager, boolean deviceSynchronization, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
public IotHubAdapter(IoTDeviceProvider ioTDeviceProvider, DevicesManager deviceClientManager, boolean deviceSynchronization, Counters counters) {
this.ioTDeviceProvider = ioTDeviceProvider;
this.devicesManager = deviceClientManager;
this.deviceSynchronization = deviceSynchronization;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
this.counters = counters;
}

public void sendMessage(String loClientId, int loMessageId, String message) {
Expand Down Expand Up @@ -65,7 +64,7 @@ public DeviceClientManager createOrGetDeviceClientManager(String deviceId) {
device = ioTDeviceProvider.getDevice(deviceId);
} catch (Exception e) {
LOG.debug("Problem with connection. Check IoT Hub credentials ", e);
connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(null, IotHubConnectionStatus.DISCONNECTED);
counters.setCloudConnectionStatus(false);
}

// no device in iot hub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
import com.microsoft.azure.sdk.iot.device.exceptions.MultiplexingClientRegistrationException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.orange.lo.sample.lo2iothub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.lo2iothub.utils.Counters;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
Expand All @@ -36,7 +36,7 @@ public class MultiplexingClientManager implements IotHubConnectionStatusChangeCa

private final MultiplexingClient multiplexingClient;
private final Map<String, DeviceClientManager> multiplexedDeviceClientManagers;
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
private final Counters counters;
private final IoTDeviceProvider ioTDeviceProvider;
private final int multiplexingClientId;
private IotHubConnectionStatus multiplexedConnectionStatus;
Expand All @@ -46,10 +46,10 @@ public class MultiplexingClientManager implements IotHubConnectionStatusChangeCa
private ScheduledExecutorService registerTaskScheduler = Executors.newScheduledThreadPool(1);
private int reservations;

public MultiplexingClientManager(AzureIotHubProperties azureIotHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint, IoTDeviceProvider ioTDeviceProvider) {
public MultiplexingClientManager(AzureIotHubProperties azureIotHubProperties, Counters counters, IoTDeviceProvider ioTDeviceProvider) {
this.multiplexingClient = new MultiplexingClient(azureIotHubProperties.getIotHostName(), IotHubClientProtocol.AMQPS, null);
this.multiplexedDeviceClientManagers = new HashMap<>();
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
this.counters = counters;
this.ioTDeviceProvider = ioTDeviceProvider;
this.multiplexingClientId = multiplexingClientIndex.getAndIncrement();
this.multiplexingClient.setConnectionStatusChangeCallback(this, this);
Expand All @@ -67,7 +67,7 @@ public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChange
IotHubConnectionStatusChangeReason statusChangeReason = connectionStatusChangeContext.getNewStatusReason();
Throwable throwable = connectionStatusChangeContext.getCause();

connectorHealthActuatorEndpoint.addMultiplexingConnectionStatus(multiplexingClient, multiplexedConnectionStatus);
counters.setCloudConnectionStatus(IotHubConnectionStatus.CONNECTED.equals(multiplexedConnectionStatus));

if (throwable == null) {
LOG.info("Connection status changed for multiplexing client: {}, status: {}, reason: {}", multiplexingClientId, multiplexedConnectionStatus, statusChangeReason);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) Orange. All Rights Reserved.
* <p>
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.orange.lo.sample.lo2iothub.lo;

import com.orange.lo.sample.lo2iothub.utils.Counters;
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;

public class LoMqttReconnectHandler implements DataManagementReconnectCallback {

private final Counters counters;

public LoMqttReconnectHandler(Counters counters) {
this.counters = counters;
}

@Override
public void connectComplete(boolean b, String s) {
counters.setLoConnectionStatus(true);
}

@Override
public void connectionLost(Throwable throwable) {
counters.setLoConnectionStatus(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,19 @@

package com.orange.lo.sample.lo2iothub.utils;

import com.microsoft.azure.sdk.iot.device.MultiplexingClient;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
import com.orange.lo.sdk.LOApiClient;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Component
public class ConnectorHealthActuatorEndpoint implements HealthIndicator {

private final Set<LOApiClient> loApiClients = new HashSet<>();
private final Map<MultiplexingClient, IotHubConnectionStatus> multiplexingClientStatus = new HashMap<>();
Counters counters;

public ConnectorHealthActuatorEndpoint(Counters counters) {
this.counters = counters;
}

@Override
public Health getHealth(boolean includeDetails) {
Expand All @@ -34,21 +29,9 @@ public Health getHealth(boolean includeDetails) {
@Override
public Health health() {
Health.Builder builder = new Health.Builder(Status.UP);
boolean allConnected = loApiClients.stream().allMatch(c -> c.getDataManagementFifo().isConnected());
builder.withDetail("loMqttConnectionStatus", allConnected);

boolean cloudConnectionStatus = multiplexingClientStatus.values()
.stream().allMatch(cs -> cs.equals(IotHubConnectionStatus.CONNECTED));
builder.withDetail("cloudConnectionStatus", cloudConnectionStatus);

builder.withDetail("loMqttConnectionStatus", counters.isLoConnectionStatusUp());
builder.withDetail("cloudConnectionStatus", counters.isCloudConnectionStatusUp());
return builder.build();
}

public void addLoApiClient(LOApiClient loApiClient) {
this.loApiClients.add(loApiClient);
}

public void addMultiplexingConnectionStatus(MultiplexingClient multiplexingClient, IotHubConnectionStatus iotHubConnectionStatus) {
this.multiplexingClientStatus.put(multiplexingClient, iotHubConnectionStatus);
}
}
23 changes: 21 additions & 2 deletions src/main/java/com/orange/lo/sample/lo2iothub/utils/Counters.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.step.StepMeterRegistry;

@Component
public class Counters {
Expand All @@ -26,13 +25,17 @@ public class Counters {
private final Counter mesasageSentAttemptFailedCounter;
private final Counter mesasageSentCounter;
private final Counter mesasageSentFailedCounter;
private AtomicInteger loConnectionStatus;
private AtomicInteger cloudConnectionStatus;

public Counters(@Qualifier("counters") MeterRegistry meterRegistry) {
mesasageReadCounter = meterRegistry.counter("message.read");
mesasageSentAttemptCounter = meterRegistry.counter("message.sent.attempt");
mesasageSentAttemptFailedCounter = meterRegistry.counter("message.sent.attempt.failed");
mesasageSentCounter = meterRegistry.counter("message.sent");
mesasageSentFailedCounter = meterRegistry.counter("message.sent.failed");
loConnectionStatus = meterRegistry.gauge("status.connection.lo", new AtomicInteger(1));
cloudConnectionStatus = meterRegistry.gauge("status.connection.cloud", new AtomicInteger(1));
}

public Counter getMesasageReadCounter() {
Expand All @@ -55,6 +58,22 @@ public Counter getMesasageSentFailedCounter() {
return mesasageSentFailedCounter;
}

public void setLoConnectionStatus(boolean status) {
loConnectionStatus.set(status ? 1 : 0);
}

public void setCloudConnectionStatus(boolean status) {
cloudConnectionStatus.set(status ? 1 : 0);
}

public boolean isCloudConnectionStatusUp() {
return cloudConnectionStatus.get() > 0;
}

public boolean isLoConnectionStatusUp() {
return loConnectionStatus.get() > 0;
}

public List<Counter> getAll() {
return Arrays.asList(mesasageReadCounter, mesasageSentAttemptCounter, mesasageSentAttemptFailedCounter, mesasageSentCounter, mesasageSentFailedCounter);
}
Expand Down
Loading