Skip to content

Commit 52fd451

Browse files
connection status stored in counters
1 parent 9d677d7 commit 52fd451

File tree

11 files changed

+172
-97
lines changed

11 files changed

+172
-97
lines changed

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubClientFacade.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import com.microsoft.azure.eventhubs.EventData;
1212
import com.microsoft.azure.eventhubs.EventHubClient;
1313
import com.microsoft.azure.eventhubs.EventHubException;
14-
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
14+
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717
import org.springframework.stereotype.Component;
@@ -32,13 +32,13 @@ public class EventHubClientFacade {
3232
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
3333
public static final String EVENT_HUB_CLIENT_NOT_INITIALIZED_MESSAGE = "The event hub client has not been initialized. Check the validity of your credentials";
3434

35-
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
35+
private final Counters counters;
3636
private final EventHubProperties eventHubProperties;
3737
private EventHubClient eventHubClient;
3838

39-
public EventHubClientFacade(EventHubProperties eventHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
39+
public EventHubClientFacade(EventHubProperties eventHubProperties, Counters counters) {
4040
this.eventHubProperties = eventHubProperties;
41-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
41+
this.counters = counters;
4242
}
4343

4444
@PostConstruct
@@ -56,7 +56,7 @@ private void init() {
5656
this.eventHubClient = EventHubClient.createFromConnectionStringSync(conn.toString(), executor);
5757
} catch (Exception e) {
5858
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
59-
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
59+
counters.setCloudConnectionStatus(false);
6060
}
6161
}
6262

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
1111
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoService;
12-
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
1312
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
1413
import net.jodah.failsafe.Failsafe;
1514
import net.jodah.failsafe.RetryPolicy;
@@ -39,18 +38,15 @@ public class EventHubSender {
3938
private final Counters counters;
4039
private final EventHubProperties eventHubProperties;
4140
private final ExecutorService executorService;
42-
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
4341
private final LoService loService;
4442
private final Queue<LoMessage> messageQueue;
4543

4644
EventHubSender(EventHubClientFacade eventHubClientFacade, Counters counters, EventHubProperties eventHubProperties,
47-
ExecutorService executorService, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
48-
LoService loService, Queue<LoMessage> messageQueue) {
45+
ExecutorService executorService, LoService loService, Queue<LoMessage> messageQueue) {
4946
this.eventHubClientFacade = eventHubClientFacade;
5047
this.counters = counters;
5148
this.eventHubProperties = eventHubProperties;
5249
this.executorService = executorService;
53-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
5450
this.loService = loService;
5551
this.messageQueue = messageQueue;
5652
}
@@ -79,10 +75,10 @@ public void send(List<LoMessage> messages) {
7975
try {
8076
List<String> messageContentList = messages.stream().map(LoMessage::message).toList();
8177
eventHubClientFacade.sendSync(messageContentList);
82-
connectorHealthActuatorEndpoint.setCloudConnectionStatus(true);
78+
counters.setCloudConnectionStatus(true);
8379
} catch (EventHubClientFacadeException e) {
8480
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
85-
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
81+
counters.setCloudConnectionStatus(false);
8682
throw e;
8783
}
8884
});
@@ -94,7 +90,7 @@ private void checkConnection() {
9490
eventHubClientFacade.sendSync(new byte[0]);
9591
} catch (EventHubClientFacadeException e) {
9692
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
97-
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
93+
counters.setCloudConnectionStatus(false);
9894
}
9995
}
10096

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttReconnectHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,26 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
99

10-
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
10+
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
1111
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
1212
import org.springframework.stereotype.Component;
1313

1414
@Component
1515
public class LoMqttReconnectHandler implements DataManagementReconnectCallback {
1616

17-
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
17+
private final Counters counters;
1818

19-
public LoMqttReconnectHandler(ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
20-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
19+
public LoMqttReconnectHandler(Counters counters) {
20+
this.counters = counters;
2121
}
2222

2323
@Override
2424
public void connectComplete(boolean b, String s) {
25-
connectorHealthActuatorEndpoint.setLoConnectionStatus(true);
25+
counters.setLoConnectionStatus(true);
2626
}
2727

2828
@Override
2929
public void connectionLost(Throwable throwable) {
30-
connectorHealthActuatorEndpoint.setLoConnectionStatus(false);
30+
counters.setLoConnectionStatus(false);
3131
}
3232
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
99

10-
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
10+
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
1111
import com.orange.lo.sdk.LOApiClient;
1212
import com.orange.lo.sdk.fifomqtt.DataManagementFifo;
1313
import com.orange.lo.sdk.mqtt.exceptions.LoMqttException;
@@ -27,13 +27,13 @@ public class LoService {
2727
private final DataManagementFifo dataManagementFifo;
2828
private final LoProperties loProperties;
2929

30-
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
30+
private final Counters counters;
3131

32-
public LoService(LOApiClient loApiClient, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
32+
public LoService(LOApiClient loApiClient, Counters counters,
3333
LoProperties loProperties) {
3434
this.dataManagementFifo = loApiClient.getDataManagementFifo();
3535
this.loProperties = loProperties;
36-
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
36+
this.counters = counters;
3737
}
3838

3939
@PostConstruct
@@ -42,10 +42,10 @@ public void start() {
4242
dataManagementFifo.connect();
4343
} catch (LoMqttException e) {
4444
LOG.error("Problem with connection. Check LO credentials. ", e);
45-
connectorHealthActuatorEndpoint.setLoConnectionStatus(false);
45+
counters.setLoConnectionStatus(false);
4646
}
4747

48-
if (connectorHealthActuatorEndpoint.isCloudConnectionStatus() && connectorHealthActuatorEndpoint.isLoConnectionStatus())
48+
if (counters.isCloudConnectionStatusUp() && counters.isLoConnectionStatusUp())
4949
dataManagementFifo.connectAndSubscribe();
5050
}
5151

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
/**
2+
* Copyright (c) Orange, Inc. and its affiliates. All Rights Reserved.
3+
* <p>
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
18
package com.orange.lo.sample.mqtt2eventhub.utils;
29

310
import org.springframework.boot.actuate.health.Health;
@@ -8,8 +15,11 @@
815
@Component
916
public class ConnectorHealthActuatorEndpoint implements HealthIndicator {
1017

11-
boolean cloudConnectionStatus = true;
12-
boolean loConnectionStatus = true;
18+
private Counters counters;
19+
20+
public ConnectorHealthActuatorEndpoint(Counters counters) {
21+
this.counters = counters;
22+
}
1323

1424
@Override
1525
public Health getHealth(boolean includeDetails) {
@@ -20,24 +30,8 @@ public Health getHealth(boolean includeDetails) {
2030
public Health health() {
2131
Health.Builder builder = new Health.Builder(Status.UP);
2232

23-
builder.withDetail("loMqttConnectionStatus", loConnectionStatus);
24-
builder.withDetail("cloudConnectionStatus", cloudConnectionStatus);
33+
builder.withDetail("loMqttConnectionStatus", counters.isLoConnectionStatusUp());
34+
builder.withDetail("cloudConnectionStatus", counters.isCloudConnectionStatusUp());
2535
return builder.build();
2636
}
27-
28-
public void setCloudConnectionStatus(boolean cloudConnectionStatus) {
29-
this.cloudConnectionStatus = cloudConnectionStatus;
30-
}
31-
32-
public boolean isCloudConnectionStatus() {
33-
return cloudConnectionStatus;
34-
}
35-
36-
public void setLoConnectionStatus(boolean cloudConnectionStatus) {
37-
this.loConnectionStatus = cloudConnectionStatus;
38-
}
39-
40-
public boolean isLoConnectionStatus() {
41-
return loConnectionStatus;
42-
}
4337
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/utils/Counters.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.util.Arrays;
1515
import java.util.List;
16+
import java.util.concurrent.atomic.AtomicInteger;
1617

1718
@Component
1819
public class Counters {
@@ -22,13 +23,17 @@ public class Counters {
2223
private final Counter mesasageSentAttemptFailedCounter;
2324
private final Counter mesasageSentCounter;
2425
private final Counter mesasageSentFailedCounter;
26+
private AtomicInteger loConnectionStatus;
27+
private AtomicInteger cloudConnectionStatus;
2528

2629
public Counters(MeterRegistry meterRegistry) {
2730
mesasageReadCounter = meterRegistry.counter("message.read");
2831
mesasageSentAttemptCounter = meterRegistry.counter("message.sent.attempt");
2932
mesasageSentAttemptFailedCounter = meterRegistry.counter("message.sent.attempt.failed");
3033
mesasageSentCounter = meterRegistry.counter("message.sent");
3134
mesasageSentFailedCounter = meterRegistry.counter("message.sent.failed");
35+
loConnectionStatus = meterRegistry.gauge("status.connection.lo", new AtomicInteger(1));
36+
cloudConnectionStatus = meterRegistry.gauge("status.connection.cloud", new AtomicInteger(1));
3237
}
3338

3439
public Counter getMesasageReadCounter() {
@@ -50,7 +55,23 @@ public Counter getMesasageSentCounter() {
5055
public Counter getMesasageSentFailedCounter() {
5156
return mesasageSentFailedCounter;
5257
}
53-
58+
59+
public void setLoConnectionStatus(boolean status) {
60+
loConnectionStatus.set(status ? 1 : 0);
61+
}
62+
63+
public void setCloudConnectionStatus(boolean status) {
64+
cloudConnectionStatus.set(status ? 1 : 0);
65+
}
66+
67+
public boolean isCloudConnectionStatusUp() {
68+
return cloudConnectionStatus.get() > 0;
69+
}
70+
71+
public boolean isLoConnectionStatusUp() {
72+
return loConnectionStatus.get() > 0;
73+
}
74+
5475
public List<Counter> getAll() {
5576
return Arrays.asList(mesasageReadCounter, mesasageSentAttemptCounter, mesasageSentAttemptFailedCounter, mesasageSentCounter, mesasageSentFailedCounter);
5677
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/utils/MeterRegistryConfig.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import io.micrometer.cloudwatch2.CloudWatchMeterRegistry;
1212
import io.micrometer.core.instrument.Clock;
1313
import io.micrometer.core.instrument.Counter;
14+
import io.micrometer.core.instrument.Gauge;
15+
import io.micrometer.core.instrument.Meter;
1416
import io.micrometer.core.instrument.MeterRegistry;
1517
import io.micrometer.core.instrument.config.MeterFilter;
1618
import io.micrometer.core.instrument.step.StepMeterRegistry;
@@ -30,15 +32,20 @@
3032

3133
import java.lang.invoke.MethodHandles;
3234
import java.time.Duration;
35+
import java.util.List;
3336
import java.util.concurrent.Executors;
3437
import java.util.concurrent.ThreadFactory;
3538
import java.util.concurrent.TimeUnit;
39+
import java.util.stream.Collectors;
40+
import java.util.stream.Stream;
3641

3742
@Configuration
3843
public class MeterRegistryConfig {
3944

4045
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
4146
private static final String AWS_SERVICE_PROFILE_NAME = "service-profile";
47+
public static final String MESSAGE_METRICS_PREFIX = "message";
48+
public static final String STATUS_METRICS_PREFIX = "status";
4249
private final MetricsProperties metricsProperties;
4350

4451
public MeterRegistryConfig(MetricsProperties metricsProperties) {
@@ -66,7 +73,7 @@ private CloudWatchMeterRegistry getCloudWatchMeterRegistry() {
6673
CloudWatchMeterRegistry cloudWatchMeterRegistry = new CloudWatchMeterRegistry(cloudWatchConfig(), Clock.SYSTEM, cloudWatchAsyncClient);
6774

6875
cloudWatchMeterRegistry.config()
69-
.meterFilter(MeterFilter.deny(id -> !id.getName().startsWith("message")))
76+
.meterFilter(MeterFilter.deny(id -> !isAConnectorMetric(id)))
7077
.commonTags(metricsProperties.getDimensionName(), metricsProperties.getDimensionValue());
7178
return cloudWatchMeterRegistry;
7279
}
@@ -99,6 +106,11 @@ public String namespace() {
99106
};
100107
}
101108

109+
private static boolean isAConnectorMetric(Meter.Id id) {
110+
String name = id.getName();
111+
return name.startsWith(MESSAGE_METRICS_PREFIX) || name.startsWith(STATUS_METRICS_PREFIX);
112+
}
113+
102114
private StepMeterRegistry stepMeterRegistry() {
103115
StepMeterRegistry stepMeterRegistry = new StepMeterRegistry(stepRegistryConfig(), Clock.SYSTEM) {
104116

@@ -110,10 +122,22 @@ protected TimeUnit getBaseTimeUnit() {
110122

111123
@Override
112124
protected void publish() {
113-
getMeters().stream()
114-
.filter(m -> m.getId().getName().startsWith("message") )
115-
.map(m -> get(m.getId().getName()).counter())
116-
.forEach(c -> LOG.info("{} = {}", c.getId().getName(), val(c)));
125+
Stream<String> message = getMeters().stream()
126+
.filter(m -> m.getId().getName().startsWith(MESSAGE_METRICS_PREFIX))
127+
.map(m -> {
128+
Counter counter = get(m.getId().getName()).counter();
129+
return counter.getId().getName() + " = " + val(counter);
130+
});
131+
132+
Stream<String> status = getMeters().stream()
133+
.filter(m -> m.getId().getName().startsWith(STATUS_METRICS_PREFIX))
134+
.map(m -> {
135+
Gauge gauge = get(m.getId().getName()).gauge();
136+
return gauge.getId().getName() + " = " + Math.round(gauge.value());
137+
});
138+
139+
List<String> collect = Stream.concat(message, status).collect(Collectors.toList());
140+
collect.forEach(LOG::info);
117141
}
118142

119143
@Override

0 commit comments

Comments
 (0)