Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -32,13 +32,13 @@ public class EventHubClientFacade {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String EVENT_HUB_CLIENT_NOT_INITIALIZED_MESSAGE = "The event hub client has not been initialized. Check the validity of your credentials";

private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
private final Counters counters;
private final EventHubProperties eventHubProperties;
private EventHubClient eventHubClient;

public EventHubClientFacade(EventHubProperties eventHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
public EventHubClientFacade(EventHubProperties eventHubProperties, Counters counters) {
this.eventHubProperties = eventHubProperties;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
this.counters = counters;
}

@PostConstruct
Expand All @@ -56,7 +56,7 @@ private void init() {
this.eventHubClient = EventHubClient.createFromConnectionStringSync(conn.toString(), executor);
} catch (Exception e) {
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
counters.setCloudConnectionStatus(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoService;
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
Expand Down Expand Up @@ -39,18 +38,15 @@ public class EventHubSender {
private final Counters counters;
private final EventHubProperties eventHubProperties;
private final ExecutorService executorService;
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
private final LoService loService;
private final Queue<LoMessage> messageQueue;

EventHubSender(EventHubClientFacade eventHubClientFacade, Counters counters, EventHubProperties eventHubProperties,
ExecutorService executorService, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
LoService loService, Queue<LoMessage> messageQueue) {
ExecutorService executorService, LoService loService, Queue<LoMessage> messageQueue) {
this.eventHubClientFacade = eventHubClientFacade;
this.counters = counters;
this.eventHubProperties = eventHubProperties;
this.executorService = executorService;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
this.loService = loService;
this.messageQueue = messageQueue;
}
Expand Down Expand Up @@ -79,10 +75,10 @@ public void send(List<LoMessage> messages) {
try {
List<String> messageContentList = messages.stream().map(LoMessage::message).toList();
eventHubClientFacade.sendSync(messageContentList);
connectorHealthActuatorEndpoint.setCloudConnectionStatus(true);
counters.setCloudConnectionStatus(true);
} catch (EventHubClientFacadeException e) {
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
counters.setCloudConnectionStatus(false);
throw e;
}
});
Expand All @@ -94,7 +90,7 @@ private void checkConnection() {
eventHubClientFacade.sendSync(new byte[0]);
} catch (EventHubClientFacadeException e) {
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
counters.setCloudConnectionStatus(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

package com.orange.lo.sample.mqtt2eventhub.liveobjects;

import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
import org.springframework.stereotype.Component;

@Component
public class LoMqttReconnectHandler implements DataManagementReconnectCallback {

private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
private final Counters counters;

public LoMqttReconnectHandler(ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
public LoMqttReconnectHandler(Counters counters) {
this.counters = counters;
}

@Override
public void connectComplete(boolean b, String s) {
connectorHealthActuatorEndpoint.setLoConnectionStatus(true);
counters.setLoConnectionStatus(true);
}

@Override
public void connectionLost(Throwable throwable) {
connectorHealthActuatorEndpoint.setLoConnectionStatus(false);
counters.setLoConnectionStatus(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

package com.orange.lo.sample.mqtt2eventhub.liveobjects;

import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
import com.orange.lo.sdk.LOApiClient;
import com.orange.lo.sdk.fifomqtt.DataManagementFifo;
import com.orange.lo.sdk.mqtt.exceptions.LoMqttException;
Expand All @@ -27,13 +27,13 @@ public class LoService {
private final DataManagementFifo dataManagementFifo;
private final LoProperties loProperties;

private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
private final Counters counters;

public LoService(LOApiClient loApiClient, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
public LoService(LOApiClient loApiClient, Counters counters,
LoProperties loProperties) {
this.dataManagementFifo = loApiClient.getDataManagementFifo();
this.loProperties = loProperties;
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
this.counters = counters;
}

@PostConstruct
Expand All @@ -42,10 +42,10 @@ public void start() {
dataManagementFifo.connect();
} catch (LoMqttException e) {
LOG.error("Problem with connection. Check LO credentials. ", e);
connectorHealthActuatorEndpoint.setLoConnectionStatus(false);
counters.setLoConnectionStatus(false);
}

if (connectorHealthActuatorEndpoint.isCloudConnectionStatus() && connectorHealthActuatorEndpoint.isLoConnectionStatus())
if (counters.isCloudConnectionStatusUp() && counters.isLoConnectionStatusUp())
dataManagementFifo.connectAndSubscribe();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/**
* Copyright (c) Orange, Inc. and its affiliates. All Rights Reserved.
* <p>
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.orange.lo.sample.mqtt2eventhub.utils;

import org.springframework.boot.actuate.health.Health;
Expand All @@ -8,8 +15,11 @@
@Component
public class ConnectorHealthActuatorEndpoint implements HealthIndicator {

boolean cloudConnectionStatus = true;
boolean loConnectionStatus = true;
private Counters counters;

public ConnectorHealthActuatorEndpoint(Counters counters) {
this.counters = counters;
}

@Override
public Health getHealth(boolean includeDetails) {
Expand All @@ -20,24 +30,8 @@ public Health getHealth(boolean includeDetails) {
public Health health() {
Health.Builder builder = new Health.Builder(Status.UP);

builder.withDetail("loMqttConnectionStatus", loConnectionStatus);
builder.withDetail("cloudConnectionStatus", cloudConnectionStatus);
builder.withDetail("loMqttConnectionStatus", counters.isLoConnectionStatusUp());
builder.withDetail("cloudConnectionStatus", counters.isCloudConnectionStatusUp());
return builder.build();
}

public void setCloudConnectionStatus(boolean cloudConnectionStatus) {
this.cloudConnectionStatus = cloudConnectionStatus;
}

public boolean isCloudConnectionStatus() {
return cloudConnectionStatus;
}

public void setLoConnectionStatus(boolean cloudConnectionStatus) {
this.loConnectionStatus = cloudConnectionStatus;
}

public boolean isLoConnectionStatus() {
return loConnectionStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class Counters {
Expand All @@ -22,13 +23,17 @@ public class Counters {
private final Counter mesasageSentAttemptFailedCounter;
private final Counter mesasageSentCounter;
private final Counter mesasageSentFailedCounter;
private AtomicInteger loConnectionStatus;
private AtomicInteger cloudConnectionStatus;

public Counters(MeterRegistry meterRegistry) {
mesasageReadCounter = meterRegistry.counter("message.read");
mesasageSentAttemptCounter = meterRegistry.counter("message.sent.attempt");
mesasageSentAttemptFailedCounter = meterRegistry.counter("message.sent.attempt.failed");
mesasageSentCounter = meterRegistry.counter("message.sent");
mesasageSentFailedCounter = meterRegistry.counter("message.sent.failed");
loConnectionStatus = meterRegistry.gauge("status.connection.lo", new AtomicInteger(1));
cloudConnectionStatus = meterRegistry.gauge("status.connection.cloud", new AtomicInteger(1));
}

public Counter getMesasageReadCounter() {
Expand All @@ -50,7 +55,23 @@ public Counter getMesasageSentCounter() {
public Counter getMesasageSentFailedCounter() {
return mesasageSentFailedCounter;
}


public void setLoConnectionStatus(boolean status) {
loConnectionStatus.set(status ? 1 : 0);
}

public void setCloudConnectionStatus(boolean status) {
cloudConnectionStatus.set(status ? 1 : 0);
}

public boolean isCloudConnectionStatusUp() {
return cloudConnectionStatus.get() > 0;
}

public boolean isLoConnectionStatusUp() {
return loConnectionStatus.get() > 0;
}

public List<Counter> getAll() {
return Arrays.asList(mesasageReadCounter, mesasageSentAttemptCounter, mesasageSentAttemptFailedCounter, mesasageSentCounter, mesasageSentFailedCounter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.micrometer.cloudwatch2.CloudWatchMeterRegistry;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.step.StepMeterRegistry;
Expand All @@ -30,15 +32,20 @@

import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Configuration
public class MeterRegistryConfig {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String AWS_SERVICE_PROFILE_NAME = "service-profile";
public static final String MESSAGE_METRICS_PREFIX = "message";
public static final String STATUS_METRICS_PREFIX = "status";
private final MetricsProperties metricsProperties;

public MeterRegistryConfig(MetricsProperties metricsProperties) {
Expand Down Expand Up @@ -66,7 +73,7 @@ private CloudWatchMeterRegistry getCloudWatchMeterRegistry() {
CloudWatchMeterRegistry cloudWatchMeterRegistry = new CloudWatchMeterRegistry(cloudWatchConfig(), Clock.SYSTEM, cloudWatchAsyncClient);

cloudWatchMeterRegistry.config()
.meterFilter(MeterFilter.deny(id -> !id.getName().startsWith("message")))
.meterFilter(MeterFilter.deny(id -> !isAConnectorMetric(id)))
.commonTags(metricsProperties.getDimensionName(), metricsProperties.getDimensionValue());
return cloudWatchMeterRegistry;
}
Expand Down Expand Up @@ -99,6 +106,11 @@ public String namespace() {
};
}

private static boolean isAConnectorMetric(Meter.Id id) {
String name = id.getName();
return name.startsWith(MESSAGE_METRICS_PREFIX) || name.startsWith(STATUS_METRICS_PREFIX);
}

private StepMeterRegistry stepMeterRegistry() {
StepMeterRegistry stepMeterRegistry = new StepMeterRegistry(stepRegistryConfig(), Clock.SYSTEM) {

Expand All @@ -110,10 +122,22 @@ protected TimeUnit getBaseTimeUnit() {

@Override
protected void publish() {
getMeters().stream()
.filter(m -> m.getId().getName().startsWith("message") )
.map(m -> get(m.getId().getName()).counter())
.forEach(c -> LOG.info("{} = {}", c.getId().getName(), val(c)));
Stream<String> message = getMeters().stream()
.filter(m -> m.getId().getName().startsWith(MESSAGE_METRICS_PREFIX))
.map(m -> {
Counter counter = get(m.getId().getName()).counter();
return counter.getId().getName() + " = " + val(counter);
});

Stream<String> status = getMeters().stream()
.filter(m -> m.getId().getName().startsWith(STATUS_METRICS_PREFIX))
.map(m -> {
Gauge gauge = get(m.getId().getName()).gauge();
return gauge.getId().getName() + " = " + Math.round(gauge.value());
});

List<String> collect = Stream.concat(message, status).collect(Collectors.toList());
collect.forEach(LOG::info);
}

@Override
Expand Down
Loading