Skip to content

Commit 5e2664e

Browse files
Missing classes
1 parent b3f9525 commit 5e2664e

File tree

8 files changed

+371
-0
lines changed

8 files changed

+371
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Copyright (c) Orange. All Rights Reserved.
3+
* <p>
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
package com.orange.lo.sample.mqtt2eventhub;
9+
10+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
14+
import java.util.Queue;
15+
import java.util.concurrent.ConcurrentLinkedQueue;
16+
17+
@Configuration
18+
public class ApplicationConfig {
19+
20+
@Bean
21+
public Queue<LoMessage> messageQueue() {
22+
return new ConcurrentLinkedQueue<>();
23+
}
24+
25+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
2+
3+
public record LoMessage(int messageId, String message) {
4+
5+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright (c) Orange. All Rights Reserved.
3+
* <p>
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
9+
10+
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
11+
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import org.springframework.stereotype.Component;
15+
16+
import java.lang.invoke.MethodHandles;
17+
18+
@Component
19+
public class LoMqttReconnectHandler implements DataManagementReconnectCallback {
20+
21+
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
22+
23+
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
24+
25+
public LoMqttReconnectHandler(ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint) {
26+
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
27+
}
28+
29+
@Override
30+
public void connectComplete(boolean b, String s) {
31+
LOG.info("Hahahaah moj handler #1 {}", connectorHealthActuatorEndpoint.isLoConnectionStatus());
32+
connectorHealthActuatorEndpoint.setLoConnectionStatus(true);
33+
LOG.info("Hahahaah moj handler #2 {}", connectorHealthActuatorEndpoint.isLoConnectionStatus());
34+
}
35+
36+
@Override
37+
public void connectionLost(Throwable throwable) {
38+
connectorHealthActuatorEndpoint.setLoConnectionStatus(false);
39+
}
40+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.orange.lo.sample.mqtt2eventhub;
2+
3+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
4+
import org.junit.jupiter.api.BeforeEach;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.util.Queue;
8+
import java.util.concurrent.ConcurrentLinkedQueue;
9+
10+
import static org.junit.jupiter.api.Assertions.*;
11+
12+
class ApplicationConfigTest {
13+
14+
private ApplicationConfig applicationConfig;
15+
16+
@BeforeEach
17+
void setUp() {
18+
applicationConfig = new ApplicationConfig();
19+
}
20+
21+
@Test
22+
void messageQueueShouldBeThreadSafe() {
23+
Queue<LoMessage> loMessages = applicationConfig.messageQueue();
24+
25+
assertInstanceOf(ConcurrentLinkedQueue.class, loMessages);
26+
}
27+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package com.orange.lo.sample.mqtt2eventhub.evthub;
2+
3+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoMessage;
4+
import com.orange.lo.sample.mqtt2eventhub.liveobjects.LoService;
5+
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
6+
import com.orange.lo.sample.mqtt2eventhub.utils.Counters;
7+
import io.micrometer.core.instrument.Counter;
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.ExtendWith;
11+
import org.mockito.ArgumentMatcher;
12+
import org.mockito.ArgumentMatchers;
13+
import org.mockito.Mock;
14+
import org.mockito.junit.jupiter.MockitoExtension;
15+
16+
import java.time.Duration;
17+
import java.util.ArrayList;
18+
import java.util.LinkedList;
19+
import java.util.List;
20+
import java.util.Queue;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.IntStream;
24+
25+
import static org.junit.jupiter.api.Assertions.*;
26+
import static org.mockito.ArgumentMatchers.any;
27+
import static org.mockito.ArgumentMatchers.anyList;
28+
import static org.mockito.Mockito.never;
29+
import static org.mockito.Mockito.times;
30+
import static org.mockito.Mockito.verify;
31+
import static org.mockito.Mockito.when;
32+
33+
@ExtendWith(MockitoExtension.class)
34+
class EventHubSenderTest {
35+
@Mock
36+
private EventHubClientFacade eventHubClientFacade;
37+
@Mock
38+
private Counters counters;
39+
@Mock
40+
private Counter counter;
41+
@Mock
42+
private EventHubProperties eventHubProperties;
43+
@Mock
44+
private ExecutorService executorService;
45+
@Mock
46+
private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
47+
@Mock
48+
private LoService loService;
49+
50+
EventHubSender eventHubSender;
51+
52+
@BeforeEach
53+
void setUp() {
54+
prepareService(new LinkedList<>());
55+
}
56+
57+
private void prepareService(LinkedList<LoMessage> messageQueue) {
58+
eventHubSender = new EventHubSender(eventHubClientFacade, counters, eventHubProperties, executorService,
59+
connectorHealthActuatorEndpoint, loService, messageQueue);
60+
}
61+
62+
@Test
63+
void shouldNotSendMessagesIfQueueIsEmpty() throws EventHubClientFacadeException {
64+
// when
65+
eventHubSender.send();
66+
67+
// then
68+
verify(eventHubClientFacade, never()).sendSync(ArgumentMatchers.<String>anyList());
69+
}
70+
71+
@Test
72+
void shouldSendMessagesInOneBatchIfQueueNotExceedMessageBatchSizeProperty() throws EventHubClientFacadeException {
73+
// given
74+
int batchSize = 5;
75+
76+
when(eventHubProperties.getMessageBatchSize()).thenReturn(batchSize);
77+
when(eventHubProperties.getMaxSendAttempts()).thenReturn(10);
78+
when(eventHubProperties.getThrottlingDelay()).thenReturn(Duration.ofMillis(1));
79+
when(counters.getMesasageSentAttemptCounter()).thenReturn(counter);
80+
81+
LinkedList<LoMessage> messageQueue = getExampleMessageQueue(batchSize);
82+
83+
prepareService(messageQueue);
84+
85+
List<String> expectedMessages = messageQueue.stream().map(LoMessage::message).toList();
86+
87+
// when
88+
eventHubSender.send();
89+
90+
// then
91+
verify(eventHubClientFacade, times(1)).sendSync(expectedMessages);
92+
}
93+
94+
@Test
95+
void shouldSplitMessagesIntoPacketsIfQueueExceedMessageBatchSizeProperty() throws EventHubClientFacadeException {
96+
// given
97+
int batchSize = 5;
98+
int totalLength = batchSize + 1;
99+
100+
when(eventHubProperties.getMessageBatchSize()).thenReturn(batchSize);
101+
when(eventHubProperties.getMaxSendAttempts()).thenReturn(10);
102+
when(eventHubProperties.getThrottlingDelay()).thenReturn(Duration.ofMillis(1));
103+
when(counters.getMesasageSentAttemptCounter()).thenReturn(counter);
104+
105+
LinkedList<LoMessage> messageQueue = getExampleMessageQueue(totalLength);
106+
107+
prepareService(messageQueue);
108+
109+
List<String> expectedMessages1 = (new LinkedList<>(messageQueue)).subList(0, batchSize)
110+
.stream()
111+
.map(LoMessage::message)
112+
.toList();
113+
List<String> expectedMessages2 = (new LinkedList<>(messageQueue)).subList(batchSize, totalLength)
114+
.stream()
115+
.map(LoMessage::message)
116+
.toList();
117+
118+
// when
119+
eventHubSender.send();
120+
121+
// then
122+
verify(eventHubClientFacade, times(1)).sendSync(expectedMessages1);
123+
verify(eventHubClientFacade, times(1)).sendSync(expectedMessages2);
124+
}
125+
126+
private LinkedList<LoMessage> getExampleMessageQueue(int batchSize) {
127+
return IntStream.range(1, batchSize + 1)
128+
.mapToObj(i -> new LoMessage(i, String.format("Message %d", i)))
129+
.collect(Collectors.toCollection(LinkedList::new));
130+
}
131+
132+
class IsListWithGivenSize implements ArgumentMatcher<List> {
133+
int size;
134+
135+
public IsListWithGivenSize(int size) {
136+
this.size = size;
137+
}
138+
139+
@Override
140+
public boolean matches(List list) {
141+
return list.size() == size;
142+
}
143+
}
144+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
2+
3+
import com.orange.lo.sdk.LOApiClient;
4+
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
5+
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;
6+
import org.junit.jupiter.api.BeforeEach;
7+
import org.junit.jupiter.api.Test;
8+
import org.junit.jupiter.api.extension.ExtendWith;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
11+
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
12+
import org.springframework.test.context.ActiveProfiles;
13+
import org.springframework.test.context.ContextConfiguration;
14+
import org.springframework.test.context.junit.jupiter.SpringExtension;
15+
16+
import static org.junit.jupiter.api.Assertions.*;
17+
18+
@ExtendWith({SpringExtension.class})
19+
@EnableConfigurationProperties(LoProperties.class)
20+
@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
21+
@ActiveProfiles("unittest")
22+
class LoConfigTest {
23+
24+
private DataManagementFifoCallback dataManagementFifoCallback = (i, s) -> {
25+
26+
};
27+
private final DataManagementReconnectCallback dataManagementReconnectCallback = new DataManagementReconnectCallback() {
28+
29+
@Override
30+
public void connectComplete(boolean b, String s) {
31+
32+
}
33+
34+
@Override
35+
public void connectionLost(Throwable throwable) {
36+
37+
}
38+
};
39+
40+
@Autowired
41+
private LoProperties loProperties;
42+
private LoConfig loConfig;
43+
44+
@BeforeEach
45+
void setUp() {
46+
loConfig = new LoConfig(loProperties, dataManagementFifoCallback, dataManagementReconnectCallback);
47+
}
48+
49+
@Test
50+
void shouldInitLoApiClient() {
51+
LOApiClient loApiClient = loConfig.loApiClient();
52+
53+
assertNotNull(loApiClient);
54+
}
55+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
2+
3+
import com.orange.lo.sample.mqtt2eventhub.utils.ConnectorHealthActuatorEndpoint;
4+
import org.junit.jupiter.api.BeforeEach;
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.ExtendWith;
7+
import org.mockito.Mock;
8+
import org.mockito.junit.jupiter.MockitoExtension;
9+
10+
import static org.mockito.Mockito.times;
11+
import static org.mockito.Mockito.verify;
12+
13+
@ExtendWith(MockitoExtension.class)
14+
class LoMqttReconnectHandlerTest {
15+
16+
@Mock
17+
private ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
18+
private LoMqttReconnectHandler loMqttReconnectHandler;
19+
20+
@BeforeEach
21+
void setUp() {
22+
this.loMqttReconnectHandler = new LoMqttReconnectHandler(connectorHealthActuatorEndpoint);
23+
}
24+
25+
@Test
26+
void shouldChangeLoConnectionStausWhenConnectComplete() {
27+
loMqttReconnectHandler.connectComplete(false, "");
28+
29+
verify(connectorHealthActuatorEndpoint, times(1)).setLoConnectionStatus(true);
30+
}
31+
32+
@Test
33+
void shouldChangeLoConnectionStausWhenConnectionLost() {
34+
loMqttReconnectHandler.connectionLost(new Exception());
35+
36+
verify(connectorHealthActuatorEndpoint, times(1)).setLoConnectionStatus(false);
37+
}
38+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.orange.lo.sample.mqtt2eventhub.liveobjects;
2+
3+
import org.junit.jupiter.api.Assertions;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
8+
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
9+
import org.springframework.test.context.ActiveProfiles;
10+
import org.springframework.test.context.ContextConfiguration;
11+
import org.springframework.test.context.junit.jupiter.SpringExtension;
12+
13+
import static org.junit.jupiter.api.Assertions.*;
14+
15+
@ExtendWith({SpringExtension.class})
16+
@EnableConfigurationProperties(LoProperties.class)
17+
@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
18+
@ActiveProfiles("unittest")
19+
class LoPropertiesTest {
20+
21+
@Autowired
22+
private LoProperties loProperties;
23+
24+
@Test
25+
void shouldBindLoPropertiesFromYamlConfiguration() {
26+
Assertions.assertAll(
27+
() -> assertEquals("invalid.url.liveobjects.orange-business.com", this.loProperties.getHostname()),
28+
() -> assertEquals("test", this.loProperties.getApiKey()),
29+
() -> assertEquals("dev", this.loProperties.getTopic()),
30+
() -> assertEquals(30, this.loProperties.getKeepAliveIntervalSeconds()),
31+
() -> assertEquals(true, this.loProperties.getAutomaticReconnect()),
32+
() -> assertEquals(1, this.loProperties.getMessageQos()),
33+
() -> assertEquals("./temp/", this.loProperties.getMqttPersistenceDir()),
34+
() -> assertEquals(30000, this.loProperties.getConnectionTimeout())
35+
);
36+
}
37+
}

0 commit comments

Comments
 (0)