Skip to content

Commit a5f6cf9

Browse files
authored
Connection status (#8)
Improvement of Event Hub Monitoring
1 parent 739f727 commit a5f6cf9

File tree

5 files changed

+113
-33
lines changed

5 files changed

+113
-33
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* Copyright (c) Orange. All Rights Reserved.
3+
* <p>
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
package com.orange.lo.sample.mqtt2eventhub.evthub;
9+
10+
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
11+
import com.microsoft.azure.eventhubs.EventData;
12+
import com.microsoft.azure.eventhubs.EventHubClient;
13+
import com.microsoft.azure.eventhubs.EventHubException;
14+
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
import org.springframework.stereotype.Component;
18+
19+
import javax.annotation.PostConstruct;
20+
import javax.annotation.PreDestroy;
21+
import java.lang.invoke.MethodHandles;
22+
import java.nio.charset.Charset;
23+
import java.time.Duration;
24+
import java.time.temporal.ChronoUnit;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
28+
@Component
29+
public class EventHubClientFacade {
30+
31+
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
32+
public static final String EVENT_HUB_CLIENT_NOT_INITIALIZED_MESSAGE = "The event hub client has not been initialized. Check the validity of your credentials";
33+
34+
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
35+
private final EventHubProperties eventHubProperties;
36+
private EventHubClient eventHubClient;
37+
38+
public EventHubClientFacade(EventHubProperties eventHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
39+
this.eventHubProperties = eventHubProperties;
40+
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
41+
}
42+
43+
@PostConstruct
44+
private void init() {
45+
ConnectionStringBuilder conn = new ConnectionStringBuilder()
46+
.setOperationTimeout(Duration.of(eventHubProperties.getConnectionTimeout(), ChronoUnit.MILLIS))
47+
.setNamespaceName(eventHubProperties.getNameSpace())
48+
.setEventHubName(eventHubProperties.getEvtHubName())
49+
.setSasKeyName(eventHubProperties.getSasKeyName())
50+
.setSasKey(eventHubProperties.getSasKey());
51+
52+
this.eventHubClient = null;
53+
try {
54+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(eventHubProperties.getThreadPoolSize());
55+
this.eventHubClient = EventHubClient.createFromConnectionStringSync(conn.toString(), executor);
56+
} catch (Exception e) {
57+
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
58+
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
59+
}
60+
}
61+
62+
public void sendSync(String msg) throws EventHubClientFacadeException {
63+
byte[] payloadBytes = msg.getBytes(Charset.defaultCharset());
64+
sendSync(payloadBytes);
65+
}
66+
67+
public void sendSync(byte[] msg) throws EventHubClientFacadeException {
68+
EventData sendEvent = EventData.create(msg);
69+
70+
try {
71+
eventHubClient.sendSync(sendEvent);
72+
} catch (NullPointerException e) {
73+
throw new EventHubClientFacadeException(EVENT_HUB_CLIENT_NOT_INITIALIZED_MESSAGE);
74+
} catch (EventHubException e) {
75+
throw new EventHubClientFacadeException(e);
76+
}
77+
}
78+
79+
@PreDestroy
80+
public void onDestroy() {
81+
try {
82+
LOG.info("shutting down eventHubClient");
83+
eventHubClient.closeSync();
84+
} catch (EventHubException e) {
85+
LOG.error(e.getMessage());
86+
}
87+
}
88+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/**
2+
* Copyright (c) Orange. All Rights Reserved.
3+
* <p>
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
package com.orange.lo.sample.mqtt2eventhub.evthub;
9+
10+
public class EventHubClientFacadeException extends Exception {
11+
public EventHubClientFacadeException(Exception e) {
12+
super(e.getMessage(), e);
13+
}
14+
15+
public EventHubClientFacadeException(String message) {
16+
super(message);
17+
}
18+
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubConfig.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,9 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.evthub;
99

10-
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
11-
import com.microsoft.azure.eventhubs.EventHubClient;
12-
import com.microsoft.azure.eventhubs.EventHubException;
1310
import org.springframework.context.annotation.Bean;
1411
import org.springframework.context.annotation.Configuration;
1512

16-
import java.io.IOException;
17-
import java.time.Duration;
18-
import java.time.temporal.ChronoUnit;
1913
import java.util.concurrent.ExecutorService;
2014
import java.util.concurrent.Executors;
2115

@@ -28,18 +22,6 @@ public EventHubConfig(EventHubProperties eventHubProperties) {
2822
this.eventHubProperties = eventHubProperties;
2923
}
3024

31-
@Bean
32-
public EventHubClient eventHubClient() throws EventHubException, IOException {
33-
ConnectionStringBuilder conn = new ConnectionStringBuilder()
34-
.setOperationTimeout(Duration.of(eventHubProperties.getConnectionTimeout(), ChronoUnit.MILLIS))
35-
.setNamespaceName(eventHubProperties.getNameSpace())
36-
.setEventHubName(eventHubProperties.getEvtHubName())
37-
.setSasKeyName(eventHubProperties.getSasKeyName())
38-
.setSasKey(eventHubProperties.getSasKey());
39-
40-
return EventHubClient.createFromConnectionStringSync(conn.toString(), Executors.newScheduledThreadPool(eventHubProperties.getThreadPoolSize()));
41-
}
42-
4325
@Bean
4426
public ExecutorService executorService() {
4527
return Executors.newFixedThreadPool(eventHubProperties.getThreadPoolSize());

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.evthub;
99

10-
import com.microsoft.azure.eventhubs.*;
1110
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoProperties;
1211
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
1312
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
@@ -20,7 +19,6 @@
2019

2120
import javax.annotation.PostConstruct;
2221
import java.lang.invoke.MethodHandles;
23-
import java.nio.charset.Charset;
2422
import java.time.Duration;
2523
import java.time.temporal.ChronoUnit;
2624
import java.util.concurrent.ExecutorService;
@@ -30,18 +28,18 @@ public class EventHubSender {
3028

3129
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
3230

33-
private final EventHubClient eventHubClient;
31+
private final EventHubClientFacade eventHubClientFacade;
3432
private final Counters counters;
3533
private final EventHubProperties eventHubProperties;
3634
private final LoProperties loProperties;
3735
private final ExecutorService executorService;
3836
private final LOApiClient loApiClient;
3937
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
4038

41-
EventHubSender(EventHubClient eventHubClient, Counters counters, EventHubProperties eventHubProperties,
39+
EventHubSender(EventHubClientFacade eventHubClientFacade, Counters counters, EventHubProperties eventHubProperties,
4240
LoProperties loProperties, ExecutorService executorService, LOApiClient loApiClient,
4341
ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
44-
this.eventHubClient = eventHubClient;
42+
this.eventHubClientFacade = eventHubClientFacade;
4543
this.counters = counters;
4644
this.eventHubProperties = eventHubProperties;
4745
this.loProperties = loProperties;
@@ -52,9 +50,6 @@ public class EventHubSender {
5250

5351
public void send(int loMessageId, String msg) {
5452

55-
byte[] payloadBytes = msg.getBytes(Charset.defaultCharset());
56-
EventData sendEvent = EventData.create(payloadBytes);
57-
5853
Failsafe.with(
5954
new RetryPolicy<Void>()
6055
.withMaxAttempts(eventHubProperties.getMaxSendAttempts())
@@ -76,23 +71,21 @@ public void send(int loMessageId, String msg) {
7671
).with(executorService).run(execution -> {
7772
counters.getMesasageSentAttemptCounter().increment();
7873
try {
79-
eventHubClient.sendSync(sendEvent);
74+
eventHubClientFacade.sendSync(msg);
8075
connectorHealthActuatorEndpoint.setCloudConnectionStatus(true);
81-
} catch (EventHubException e) {
76+
} catch (EventHubClientFacadeException e) {
8277
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
8378
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
8479
throw e;
8580
}
8681
});
8782
}
8883

89-
9084
@PostConstruct
9185
private void checkConnection() {
92-
EventData sendEvent = EventData.create(new byte[0]);
9386
try {
94-
eventHubClient.sendSync(sendEvent);
95-
} catch (EventHubException e) {
87+
eventHubClientFacade.sendSync(new byte[0]);
88+
} catch (EventHubClientFacadeException e) {
9689
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
9790
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
9891
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
1212
import com.orange.lo.sdk.LOApiClient;
1313
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
14-
import io.micrometer.core.instrument.Counter;
1514
import org.slf4j.Logger;
1615
import org.slf4j.LoggerFactory;
1716
import org.springframework.context.annotation.Lazy;

0 commit comments

Comments
 (0)