Skip to content

Commit 9d677d7

Browse files
Circular references (#12)
Breaking of circular references, java 21
1 parent 4afb4bd commit 9d677d7

24 files changed

+556
-99
lines changed

.github/workflows/manual_release.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ jobs:
77
build:
88
runs-on: ubuntu-latest
99
steps:
10-
- uses: actions/checkout@v2
10+
- uses: actions/checkout@v4
1111

1212
- name: Create tag by maven-release plugin
13-
uses: qcastel/[email protected].2
13+
uses: qcastel/[email protected].41
1414
with:
1515
release-branch-name: "master"
1616
git-release-bot-name: "auto"

.github/workflows/on_push.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ jobs:
88
runs-on: ubuntu-latest
99

1010
steps:
11-
- uses: actions/checkout@v2
12-
- name: Set up JDK 1.8
13-
uses: actions/setup-java@v1
11+
- uses: actions/checkout@v4
12+
- name: Set up JDK
13+
uses: actions/setup-java@v2
1414
with:
15-
java-version: 1.8
15+
distribution: 'zulu'
16+
java-version: '21'
1617
- name: Build with Maven
1718
run: mvn clean test --file pom.xml

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
<description>Demo project for Spring Boot</description>
1515

1616
<properties>
17-
<java.version>1.8</java.version>
17+
<java.version>21</java.version>
1818
</properties>
1919

2020
<repositories>
@@ -28,7 +28,7 @@
2828
<dependency>
2929
<groupId>com.github.DatavenueLiveObjects</groupId>
3030
<artifactId>FIFO_SDK_LiveObjects</artifactId>
31-
<version>v1.16</version>
31+
<version>v2.0.2</version>
3232
</dependency>
3333
<dependency>
3434
<groupId>org.springframework.boot</groupId>
@@ -82,13 +82,13 @@
8282
<dependency>
8383
<groupId>com.tngtech.archunit</groupId>
8484
<artifactId>archunit</artifactId>
85-
<version>0.23.1</version>
85+
<version>1.4.1</version>
8686
<scope>test</scope>
8787
</dependency>
8888
<dependency>
8989
<groupId>com.tngtech.archunit</groupId>
9090
<artifactId>archunit-junit5</artifactId>
91-
<version>0.23.1</version>
91+
<version>1.4.1</version>
9292
<scope>test</scope>
9393
</dependency>
9494
<dependency>
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Copyright (c) Orange. All Rights Reserved.
3+
* <p>
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
package com.orange.lo.sample.mqtt2eventhub;
9+
10+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
14+
import java.util.Queue;
15+
import java.util.concurrent.ConcurrentLinkedQueue;
16+
17+
@Configuration
18+
public class ApplicationConfig {
19+
20+
@Bean
21+
public Queue<LoMessage> messageQueue() {
22+
return new ConcurrentLinkedQueue<>();
23+
}
24+
25+
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubClientFacade.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.charset.Charset;
2323
import java.time.Duration;
2424
import java.time.temporal.ChronoUnit;
25+
import java.util.List;
2526
import java.util.concurrent.Executors;
2627
import java.util.concurrent.ScheduledExecutorService;
2728

@@ -59,16 +60,28 @@ private void init() {
5960
}
6061
}
6162

63+
public void sendSync(List<String> messages) throws EventHubClientFacadeException {
64+
List<EventData> list = messages.stream().map(msg -> {
65+
byte[] bytes = msg.getBytes(Charset.defaultCharset());
66+
return EventData.create(bytes);
67+
}).toList();
68+
sendSync(list);
69+
}
70+
6271
public void sendSync(String msg) throws EventHubClientFacadeException {
6372
byte[] payloadBytes = msg.getBytes(Charset.defaultCharset());
6473
sendSync(payloadBytes);
6574
}
6675

67-
public void sendSync(byte[] msg) throws EventHubClientFacadeException {
68-
EventData sendEvent = EventData.create(msg);
76+
public void sendSync(byte[] payloadBytes) throws EventHubClientFacadeException {
77+
EventData sendEvent = EventData.create(payloadBytes);
78+
List<EventData> list = List.of(sendEvent);
79+
sendSync(list);
80+
}
6981

82+
public void sendSync(Iterable<EventData> eventDatas) throws EventHubClientFacadeException {
7083
try {
71-
eventHubClient.sendSync(sendEvent);
84+
eventHubClient.sendSync(eventDatas);
7285
} catch (NullPointerException e) {
7386
throw new EventHubClientFacadeException(EVENT_HUB_CLIENT_NOT_INITIALIZED_MESSAGE);
7487
} catch (EventHubException e) {

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class EventHubProperties {
2727
@DurationUnit(ChronoUnit.MILLIS)
2828
private Duration throttlingDelay = Duration.ofMillis(1);
2929
private int maxSendAttempts = 3;
30+
private Integer messageBatchSize;
3031

3132
public String getNameSpace() {
3233
return nameSpace;
@@ -99,4 +100,12 @@ public int getMaxSendAttempts() {
99100
public void setMaxSendAttempts(int maxSendAttempts) {
100101
this.maxSendAttempts = maxSendAttempts;
101102
}
103+
104+
public Integer getMessageBatchSize() {
105+
return messageBatchSize;
106+
}
107+
108+
public void setMessageBatchSize(Integer messageBatchSize) {
109+
this.messageBatchSize = messageBatchSize;
110+
}
102111
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/evthub/EventHubSender.java

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,71 +7,78 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.evthub;
99

10-
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoProperties;
10+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
11+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoService;
1112
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
1213
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
13-
import com.orange.lo.sdk.LOApiClient;
1414
import net.jodah.failsafe.Failsafe;
1515
import net.jodah.failsafe.RetryPolicy;
1616
import org.slf4j.Logger;
1717
import org.slf4j.LoggerFactory;
18+
import org.springframework.scheduling.annotation.EnableScheduling;
19+
import org.springframework.scheduling.annotation.Scheduled;
1820
import org.springframework.stereotype.Component;
1921

2022
import javax.annotation.PostConstruct;
2123
import java.lang.invoke.MethodHandles;
2224
import java.time.Duration;
2325
import java.time.temporal.ChronoUnit;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Queue;
2429
import java.util.concurrent.ExecutorService;
2530

2631
@Component
32+
@EnableScheduling
2733
public class EventHubSender {
2834

2935
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
36+
private static final int DEFAULT_BATCH_SIZE = 10;
3037

3138
private final EventHubClientFacade eventHubClientFacade;
3239
private final Counters counters;
3340
private final EventHubProperties eventHubProperties;
34-
private final LoProperties loProperties;
3541
private final ExecutorService executorService;
36-
private final LOApiClient loApiClient;
3742
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
43+
private final LoService loService;
44+
private final Queue<LoMessage> messageQueue;
3845

3946
EventHubSender(EventHubClientFacade eventHubClientFacade, Counters counters, EventHubProperties eventHubProperties,
40-
LoProperties loProperties, ExecutorService executorService, LOApiClient loApiClient,
41-
ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
47+
ExecutorService executorService, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint,
48+
LoService loService, Queue<LoMessage> messageQueue) {
4249
this.eventHubClientFacade = eventHubClientFacade;
4350
this.counters = counters;
4451
this.eventHubProperties = eventHubProperties;
45-
this.loProperties = loProperties;
4652
this.executorService = executorService;
47-
this.loApiClient = loApiClient;
4853
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
54+
this.loService = loService;
55+
this.messageQueue = messageQueue;
4956
}
5057

51-
public void send(int loMessageId, String msg) {
52-
58+
public void send(List<LoMessage> messages) {
5359
Failsafe.with(
5460
new RetryPolicy<Void>()
5561
.withMaxAttempts(eventHubProperties.getMaxSendAttempts())
5662
.withBackoff(eventHubProperties.getThrottlingDelay().toMillis(), Duration.ofMinutes(1).toMillis(), ChronoUnit.MILLIS)
5763
.onRetry(r -> {
5864
LOG.debug("Problem while sending message to Event Hub because of: {}. Retrying...", r.getLastFailure().getMessage());
59-
counters.getMesasageSentAttemptFailedCounter().increment();
65+
counters.getMesasageSentAttemptFailedCounter().increment(messages.size());
6066
})
6167
.onSuccess(r -> {
62-
LOG.debug("Message was sent to Event Hub");
63-
counters.getMesasageSentCounter().increment();
64-
loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
68+
LOG.debug("Batch of messages of the following size were sent: {}", messages.size());
69+
counters.getMesasageSentCounter().increment(messages.size());
70+
messages.forEach(m -> loService.sendAck(m.messageId()));
6571
})
6672
.onFailure(r -> {
67-
LOG.error("Cannot send message with id = {} to Event Hub because of {}", loMessageId, r.getFailure());
68-
counters.getMesasageSentFailedCounter().increment();
69-
loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
73+
LOG.error("Cannot send messages to Event Hub because of {}", r.getFailure());
74+
counters.getMesasageSentFailedCounter().increment(messages.size());
75+
messages.forEach(m -> loService.sendAck(m.messageId()));
7076
})
7177
).with(executorService).run(execution -> {
72-
counters.getMesasageSentAttemptCounter().increment();
78+
counters.getMesasageSentAttemptCounter().increment(messages.size());
7379
try {
74-
eventHubClientFacade.sendSync(msg);
80+
List<String> messageContentList = messages.stream().map(LoMessage::message).toList();
81+
eventHubClientFacade.sendSync(messageContentList);
7582
connectorHealthActuatorEndpoint.setCloudConnectionStatus(true);
7683
} catch (EventHubClientFacadeException e) {
7784
LOG.error("Problem with connection. Check Event Hub credentials. " + e.getMessage(), e);
@@ -90,4 +97,28 @@ private void checkConnection() {
9097
connectorHealthActuatorEndpoint.setCloudConnectionStatus(false);
9198
}
9299
}
100+
101+
@Scheduled(fixedRateString = "${azure.evt-hub.synchronization-interval}")
102+
public void send() {
103+
LOG.debug("Number of messages waiting to be sent: {}", messageQueue.size());
104+
if (!messageQueue.isEmpty()) {
105+
LOG.info("Start sending messages...");
106+
107+
int batchSize = eventHubProperties.getMessageBatchSize() != null
108+
? eventHubProperties.getMessageBatchSize() : DEFAULT_BATCH_SIZE;
109+
110+
List<LoMessage> messageBatch = new ArrayList<>(batchSize);
111+
while (!messageQueue.isEmpty()) {
112+
messageBatch.add(messageQueue.poll());
113+
if (messageBatch.size() == batchSize) {
114+
send(new ArrayList<>(messageBatch));
115+
messageBatch.clear();
116+
}
117+
}
118+
if (!messageBatch.isEmpty()) {
119+
send(new ArrayList<>(messageBatch));
120+
}
121+
}
122+
}
123+
93124
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoConfig.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import com.orange.lo.sdk.LOApiClient;
1111
import com.orange.lo.sdk.LOApiClientParameters;
12+
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
13+
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
1214
import org.apache.maven.model.Model;
1315
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
1416
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
@@ -25,12 +27,14 @@
2527
public class LoConfig {
2628

2729
private final LoProperties loProperties;
28-
private final LoMqttHandler loMqttHandler;
30+
private final DataManagementFifoCallback loMqttHandler;
31+
private final DataManagementReconnectCallback reconnectHandler;
2932

30-
public LoConfig(LoProperties loProperties, LoMqttHandler loMqttHandler) {
33+
public LoConfig(LoProperties loProperties, DataManagementFifoCallback loMqttHandler, DataManagementReconnectCallback reconnectHandler) {
3134
this.loProperties = loProperties;
3235
this.loMqttHandler = loMqttHandler;
33-
}
36+
this.reconnectHandler = reconnectHandler;
37+
}
3438

3539
@Bean
3640
public LOApiClient loApiClient() {
@@ -51,6 +55,7 @@ private LOApiClientParameters loApiClientParameters() {
5155
.dataManagementMqttCallback(loMqttHandler)
5256
.connectorType(loProperties.getConnectorType())
5357
.connectorVersion(getConnectorVersion())
58+
.dataManagementReconnectCallback(reconnectHandler)
5459
.build();
5560
}
5661

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
2+
3+
public record LoMessage(int messageId, String message) {
4+
5+
}

src/main/java/com/orange/lo/sample/mqtt2eventhub/liveobjects/LoMqttHandler.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,35 @@
77

88
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
99

10-
import com.orange.lo.sample.mqtt2eventhub.evthub.EventHubSender;
1110
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
12-
import com.orange.lo.sdk.LOApiClient;
1311
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
12+
import io.micrometer.core.instrument.Counter;
1413
import org.slf4j.Logger;
1514
import org.slf4j.LoggerFactory;
16-
import org.springframework.context.annotation.Lazy;
1715
import org.springframework.stereotype.Component;
1816

1917
import java.lang.invoke.MethodHandles;
18+
import java.util.Queue;
2019

2120
@Component
2221
public class LoMqttHandler implements DataManagementFifoCallback {
2322

2423
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
2524

26-
private final EventHubSender eventHubSender;
27-
private final Counters counters;
28-
private final LOApiClient loApiClient;
29-
private final LoProperties loProperties;
25+
private final Counter mesasageReadCounter;
26+
private final Queue<LoMessage> messageQueue;
3027

31-
public LoMqttHandler(@Lazy EventHubSender eventHubSender, @Lazy LOApiClient loApiClient, Counters counters, LoProperties loProperties) {
32-
this.eventHubSender = eventHubSender;
33-
this.counters = counters;
34-
this.loApiClient = loApiClient;
35-
this.loProperties = loProperties;
28+
public LoMqttHandler(Counters counterProvider, Queue<LoMessage> messageQueue) {
29+
LOG.info("LoMqttHandler init...");
30+
31+
this.mesasageReadCounter = counterProvider.getMesasageReadCounter();
32+
this.messageQueue = messageQueue;
3633
}
3734

3835
@Override
39-
public void onMessage(int loMessageId, String message) {
40-
counters.getMesasageReadCounter().increment();
41-
try {
42-
eventHubSender.send(loMessageId, message);
43-
} catch (Exception e) {
44-
LOG.error("Cannot send message with id = {} to Event Hub because of {}", loMessageId, e);
45-
counters.getMesasageSentFailedCounter().increment();
46-
loApiClient.getDataManagementFifo().sendAck(loMessageId, loProperties.getMessageQos());
47-
}
36+
public void onMessage(int messageId, String message) {
37+
LOG.debug("Received message with id = {}", messageId);
38+
mesasageReadCounter.increment();
39+
messageQueue.add(new LoMessage(messageId, message));
4840
}
4941
}

0 commit comments

Comments
 (0)