Skip to content

Commit 82049f6

Browse files
Breaking of circular references, tests
1 parent 4afb4bd commit 82049f6

File tree

17 files changed

+185
-91
lines changed

17 files changed

+185
-91
lines changed

.github/workflows/manual_release.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ jobs:
77
build:
88
runs-on: ubuntu-latest
99
steps:
10-
- uses: actions/checkout@v2
10+
- uses: actions/checkout@v4
1111

1212
- name: Create tag by maven-release plugin
13-
uses: qcastel/[email protected].2
13+
uses: qcastel/[email protected].41
1414
with:
1515
release-branch-name: "master"
1616
git-release-bot-name: "auto"

.github/workflows/on_push.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ jobs:
88
runs-on: ubuntu-latest
99

1010
steps:
11-
- uses: actions/checkout@v2
12-
- name: Set up JDK 1.8
13-
uses: actions/setup-java@v1
11+
- uses: actions/checkout@v4
12+
- name: Set up JDK
13+
uses: actions/setup-java@v2
1414
with:
15-
java-version: 1.8
15+
distribution: 'zulu'
16+
java-version: '21'
1617
- name: Build with Maven
1718
run: mvn clean test --file pom.xml

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<description>Demo project for Spring Boot</description>
1515

1616
<properties>
17-
<java.version>1.8</java.version>
17+
<java.version>21</java.version>
1818
</properties>
1919

2020
<repositories>
@@ -28,7 +28,7 @@
2828
<dependency>
2929
<groupId>com.github.DatavenueLiveObjects</groupId>
3030
<artifactId>FIFO_SDK_LiveObjects</artifactId>
31-
<version>v1.16</version>
31+
<version>v2.0.2</version>
3232
</dependency>
3333
<dependency>
3434
<groupId>org.springframework.boot</groupId>

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubClientFacade.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.charset.Charset;
2323
import java.time.Duration;
2424
import java.time.temporal.ChronoUnit;
25+
import java.util.List;
2526
import java.util.concurrent.Executors;
2627
import java.util.concurrent.ScheduledExecutorService;
2728

@@ -59,16 +60,28 @@ private void init() {
5960
}
6061
}
6162

63+
public void sendSync(List<String> messages) throws EventHubClientFacadeException {
64+
List<EventData> list = messages.stream().map(msg -> {
65+
byte[] bytes = msg.getBytes(Charset.defaultCharset());
66+
return EventData.create(bytes);
67+
}).toList();
68+
sendSync(list);
69+
}
70+
6271
public void sendSync(String msg) throws EventHubClientFacadeException {
6372
byte[] payloadBytes = msg.getBytes(Charset.defaultCharset());
6473
sendSync(payloadBytes);
6574
}
6675

67-
public void sendSync(byte[] msg) throws EventHubClientFacadeException {
68-
EventData sendEvent = EventData.create(msg);
76+
public void sendSync(byte[] payloadBytes) throws EventHubClientFacadeException {
77+
EventData sendEvent = EventData.create(payloadBytes);
78+
List<EventData> list = List.of(sendEvent);
79+
sendSync(list);
80+
}
6981

82+
public void sendSync(Iterable<EventData> eventDatas) throws EventHubClientFacadeException {
7083
try {
71-
eventHubClient.sendSync(sendEvent);
84+
eventHubClient.sendSync(eventDatas);
7285
} catch (NullPointerException e) {
7386
throw new EventHubClientFacadeException(EVENT_HUB_CLIENT_NOT_INITIALIZED_MESSAGE);
7487
} catch (EventHubException e) {

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class EventHubProperties {
2727
@DurationUnit(ChronoUnit.MILLIS)
2828
private Duration throttlingDelay = Duration.ofMillis(1);
2929
private int maxSendAttempts = 3;
30+
private Integer messageBatchSize;
3031

3132
public String getNameSpace() {
3233
return nameSpace;
@@ -99,4 +100,12 @@ public int getMaxSendAttempts() {
99100
public void setMaxSendAttempts(int maxSendAttempts) {
100101
this.maxSendAttempts = maxSendAttempts;
101102
}
103+
104+
public Integer getMessageBatchSize() {
105+
return messageBatchSize;
106+
}
107+
108+
public void setMessageBatchSize(Integer messageBatchSize) {
109+
this.messageBatchSize = messageBatchSize;
110+
}
102111
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,49 +7,53 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.evthub;
99

10-
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoProperties;
10+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
11+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoService;
1112
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
1213
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
13-
import com.orange.lo.sdk.LOApiClient;
1414
import net.jodah.failsafe.Failsafe;
1515
import net.jodah.failsafe.RetryPolicy;
1616
import org.slf4j.Logger;
1717
import org.slf4j.LoggerFactory;
18+
import org.springframework.scheduling.annotation.Scheduled;
1819
import org.springframework.stereotype.Component;
1920

2021
import javax.annotation.PostConstruct;
2122
import java.lang.invoke.MethodHandles;
2223
import java.time.Duration;
2324
import java.time.temporal.ChronoUnit;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.Queue;
2428
import java.util.concurrent.ExecutorService;
2529

2630
@Component
2731
public class EventHubSender {
2832

2933
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
34+
private static final int DEFAULT_BATCH_SIZE = 10;
3035

3136
private final EventHubClientFacade eventHubClientFacade;
3237
private final Counters counters;
3338
private final EventHubProperties eventHubProperties;
34-
private final LoProperties loProperties;
3539
private final ExecutorService executorService;
36-
private final LOApiClient loApiClient;
3740
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
41+
private final LoService loService;
42+
private final Queue<LoMessage> messageQueue;
3843

3944
EventHubSender(EventHubClientFacade eventHubClientFacade, Counters counters, EventHubProperties eventHubProperties,
40-
LoProperties loProperties, ExecutorService executorService, LOApiClient loApiClient,
41-
ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
45+
ExecutorService executorService, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
46+
LoService loService, Queue<LoMessage> messageQueue) {
4247
this.eventHubClientFacade = eventHubClientFacade;
4348
this.counters = counters;
4449
this.eventHubProperties = eventHubProperties;
45-
this.loProperties = loProperties;
4650
this.executorService = executorService;
47-
this.loApiClient = loApiClient;
4851
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
52+
this.loService = loService;
53+
this.messageQueue = messageQueue;
4954
}
5055

51-
public void send(int loMessageId, String msg) {
52-
56+
public void send(List<LoMessage> messages) {
5357
Failsafe.with(
5458
new RetryPolicy<Void>()
5559
.withMaxAttempts(eventHubProperties.getMaxSendAttempts())
@@ -61,17 +65,18 @@ public void send(int loMessageId, String msg) {
6165
.onSuccess(r -> {
6266
LOG.debug("Message was sent to Event Hub");
6367
counters.getMesasageSentCounter().increment();
64-
loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
68+
messages.forEach(m -> loService.sendAck(m.messageId()));
6569
})
6670
.onFailure(r -> {
67-
LOG.error("Cannot send message with id = {} to Event Hub because of {}", loMessageId, r.getFailure());
71+
LOG.error("Cannot send messages to Event Hub because of {}", r.getFailure());
6872
counters.getMesasageSentFailedCounter().increment();
69-
loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
73+
messages.forEach(m -> loService.sendAck(m.messageId()));
7074
})
7175
).with(executorService).run(execution -> {
7276
counters.getMesasageSentAttemptCounter().increment();
7377
try {
74-
eventHubClientFacade.sendSync(msg);
78+
List<String> messageContentList = messages.stream().map(LoMessage::message).toList();
79+
eventHubClientFacade.sendSync(messageContentList);
7580
connectorHealthActuatorEndpoint.setCloudConnectionStatus(true);
7681
} catch (EventHubClientFacadeException e) {
7782
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
@@ -90,4 +95,27 @@ private void checkConnection() {
9095
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
9196
}
9297
}
98+
99+
@Scheduled(fixedRateString = "${azure.evt-hub.synchronization-interval}")
100+
public void send() {
101+
if (!messageQueue.isEmpty()) {
102+
LOG.info("Start sending messages...");
103+
104+
int batchSize = eventHubProperties.getMessageBatchSize() != null
105+
? eventHubProperties.getMessageBatchSize() : DEFAULT_BATCH_SIZE;
106+
107+
List<LoMessage> messageBatch = new ArrayList<>(batchSize);
108+
while (!messageQueue.isEmpty()) {
109+
messageBatch.add(messageQueue.poll());
110+
if (messageBatch.size() == batchSize) {
111+
send(new ArrayList<>(messageBatch));
112+
messageBatch.clear();
113+
}
114+
}
115+
if (!messageBatch.isEmpty()) {
116+
send(new ArrayList<>(messageBatch));
117+
}
118+
}
119+
}
120+
93121
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfig.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import com.orange.lo.sdk.LOApiClient;
1111
import com.orange.lo.sdk.LOApiClientParameters;
12+
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
13+
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
1214
import org.apache.maven.model.Model;
1315
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
1416
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
@@ -25,12 +27,14 @@
2527
public class LoConfig {
2628

2729
private final LoProperties loProperties;
28-
private final LoMqttHandler loMqttHandler;
30+
private final DataManagementFifoCallback loMqttHandler;
31+
private final DataManagementReconnectCallback reconnectHandler;
2932

30-
public LoConfig(LoProperties loProperties, LoMqttHandler loMqttHandler) {
33+
public LoConfig(LoProperties loProperties, DataManagementFifoCallback loMqttHandler, DataManagementReconnectCallback reconnectHandler) {
3134
this.loProperties = loProperties;
3235
this.loMqttHandler = loMqttHandler;
33-
}
36+
this.reconnectHandler = reconnectHandler;
37+
}
3438

3539
@Bean
3640
public LOApiClient loApiClient() {
@@ -51,6 +55,7 @@ private LOApiClientParameters loApiClientParameters() {
5155
.dataManagementMqttCallback(loMqttHandler)
5256
.connectorType(loProperties.getConnectorType())
5357
.connectorVersion(getConnectorVersion())
58+
.dataManagementReconnectCallback(reconnectHandler)
5459
.build();
5560
}
5661

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandler.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,35 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
99

10-
import com.orange.lo.sample.mqtt2eventhub.evthub.EventHubSender;
1110
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
12-
import com.orange.lo.sdk.LOApiClient;
1311
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
12+
import io.micrometer.core.instrument.Counter;
1413
import org.slf4j.Logger;
1514
import org.slf4j.LoggerFactory;
16-
import org.springframework.context.annotation.Lazy;
1715
import org.springframework.stereotype.Component;
1816

1917
import java.lang.invoke.MethodHandles;
18+
import java.util.Queue;
2019

2120
@Component
2221
public class LoMqttHandler implements DataManagementFifoCallback {
2322

2423
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
2524

26-
private final EventHubSender eventHubSender;
27-
private final Counters counters;
28-
private final LOApiClient loApiClient;
29-
private final LoProperties loProperties;
25+
private final Counter mesasageReadCounter;
26+
private final Queue<LoMessage> messageQueue;
3027

31-
public LoMqttHandler(@Lazy EventHubSender eventHubSender, @Lazy LOApiClient loApiClient, Counters counters, LoProperties loProperties) {
32-
this.eventHubSender = eventHubSender;
33-
this.counters = counters;
34-
this.loApiClient = loApiClient;
35-
this.loProperties = loProperties;
28+
public LoMqttHandler(Counters counterProvider, Queue<LoMessage> messageQueue) {
29+
LOG.info("LoMqttHandler init...");
30+
31+
this.mesasageReadCounter = counterProvider.getMesasageReadCounter();
32+
this.messageQueue = messageQueue;
3633
}
3734

3835
@Override
39-
public void onMessage(int loMessageId, String message) {
40-
counters.getMesasageReadCounter().increment();
41-
try {
42-
eventHubSender.send(loMessageId, message);
43-
} catch (Exception e) {
44-
LOG.error("Cannot send message with id = {} to Event Hub because of {}", loMessageId, e);
45-
counters.getMesasageSentFailedCounter().increment();
46-
loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
47-
}
36+
public void onMessage(int messageId, String message) {
37+
LOG.debug("Received message with id = {}", messageId);
38+
mesasageReadCounter.increment();
39+
messageQueue.add(new LoMessage(messageId, message));
4840
}
4941
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoProperties.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
public class LoProperties {
1616

1717
private static final String CONNECTOR_TYPE = "LO_AZURE_EVENTHUB_ADAPTER";
18-
18+
1919
private final String hostname;
2020
private final String apiKey;
2121
private final String topic;
@@ -47,7 +47,7 @@ public LoProperties(
4747
public String getConnectorType() {
4848
return CONNECTOR_TYPE;
4949
}
50-
50+
5151
public String getHostname() {
5252
return hostname;
5353
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ public class LoService {
2525
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
2626

2727
private final DataManagementFifo dataManagementFifo;
28+
private final LoProperties loProperties;
2829

2930
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
3031

31-
public LoService(LOApiClient loApiClient, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
32+
public LoService(LOApiClient loApiClient, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
33+
LoProperties loProperties) {
3234
this.dataManagementFifo = loApiClient.getDataManagementFifo();
35+
this.loProperties = loProperties;
3336
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
3437
}
3538

@@ -50,4 +53,9 @@ public void start() {
5053
public void stop() {
5154
dataManagementFifo.disconnect();
5255
}
56+
57+
public void sendAck(int loMessageId) {
58+
dataManagementFifo.sendAck(loMessageId, loProperties.getMessageQos());
59+
}
60+
5361
}

0 commit comments

Comments
 (0)