Skip to content

Commit 9212a36

Browse files
szlisieckiSzymon Lisiecki
andauthored
add manual message ack (#31)
Co-authored-by: Szymon Lisiecki <[email protected]>
1 parent 8b54ffc commit 9212a36

File tree

11 files changed

+115
-82
lines changed

11 files changed

+115
-82
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<dependency>
2929
<groupId>com.github.DatavenueLiveObjects</groupId>
3030
<artifactId>FIFO_SDK_LiveObjects</artifactId>
31-
<version>v1.15</version>
31+
<version>v1.16</version>
3232
</dependency>
3333
<dependency>
3434
<groupId>org.springframework.boot</groupId>

src/main/java/com/orange/lo/sample/lo2iothub/ApplicationConfig.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public void init() {
8888
azureIotHubList.forEach(azureIotHubProperties -> {
8989
try {
9090
LOG.debug("Initializing for {} ", azureIotHubProperties.getIotHostName());
91+
LoAdapter loAdapter = null;
92+
9193
IoTDeviceProvider ioTDeviceProvider = createIotDeviceProvider(azureIotHubProperties);
9294

9395
DevicesManager deviceClientManager = new DevicesManager(azureIotHubProperties, connectorHealthActuatorEndpoint, ioTDeviceProvider, counters);
@@ -100,23 +102,25 @@ public void init() {
100102
);
101103

102104
LOApiClientParameters loApiClientParameters = loApiClientParameters(liveObjectsProperties,
103-
azureIotHubProperties, iotHubAdapter);
105+
azureIotHubProperties, iotHubAdapter, loAdapter);
104106
LOApiClient loApiClient = new LOApiClient(loApiClientParameters);
107+
105108
connectorHealthActuatorEndpoint.addLoApiClient(loApiClient);
106109

107-
LoAdapter loAdapter = null;
110+
108111
boolean problemWithConnection = false;
109112
try {
110113
loAdapter = new LoAdapter(loApiClient, liveObjectsProperties.getPageSize(),
111-
groupRetryPolicy, deviceRetryPolicy);
114+
groupRetryPolicy, deviceRetryPolicy, liveObjectsProperties.getQos());
112115
loAdapter.connect();
113116
} catch (Exception e) {
114117
LOG.error("Problem with connection. Check LO credentials", e);
115118
problemWithConnection = true;
116119
}
117120

118-
LoCommandSender loCommandSender = new LoCommandSender(loApiClient, objectMapper, commandRetryPolicy);
121+
LoCommandSender loCommandSender = new LoCommandSender(loAdapter, objectMapper, commandRetryPolicy);
119122
deviceClientManager.setLoCommandSender(loCommandSender);
123+
deviceClientManager.setLoAdapter(loAdapter);
120124

121125
DeviceSynchronizationTask deviceSynchronizationTask = null;
122126
try {
@@ -152,7 +156,7 @@ private IoTDeviceProvider createIotDeviceProvider(AzureIotHubProperties azureIot
152156
}
153157

154158
private LOApiClientParameters loApiClientParameters(LiveObjectsProperties loProperties,
155-
AzureIotHubProperties azureIotHubProperties, IotHubAdapter iotHubAdapter) {
159+
AzureIotHubProperties azureIotHubProperties, IotHubAdapter iotHubAdapter, LoAdapter loAdapter) {
156160

157161
List<String> topics = Lists.newArrayList(azureIotHubProperties.getLoMessagesTopic());
158162
if (loProperties.isDeviceSynchronization()) {
@@ -162,12 +166,13 @@ private LOApiClientParameters loApiClientParameters(LiveObjectsProperties loProp
162166
.hostname(loProperties.getHostname())
163167
.apiKey(loProperties.getApiKey())
164168
.automaticReconnect(true)
169+
.manualAck(true)
165170
.messageQos(loProperties.getQos())
166171
.keepAliveIntervalSeconds(loProperties.getKeepAliveIntervalSeconds())
167172
.connectionTimeout(loProperties.getConnectionTimeout())
168173
.mqttPersistenceDataDir(loProperties.getMqttPersistenceDir())
169174
.topics(topics)
170-
.dataManagementMqttCallback(new MessageHandler(iotHubAdapter, counters))
175+
.dataManagementMqttCallback(new MessageHandler(iotHubAdapter, loAdapter, counters))
171176
.connectorType(loProperties.getConnectorType())
172177
.connectorVersion(getConnectorVersion())
173178
.build();

src/main/java/com/orange/lo/sample/lo2iothub/MessageHandler.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import com.orange.lo.sample.lo2iothub.azure.IotHubAdapter;
1111
import com.orange.lo.sample.lo2iothub.exceptions.DeviceSynchronizationException;
12+
import com.orange.lo.sample.lo2iothub.lo.LoAdapter;
1213
import com.orange.lo.sample.lo2iothub.utils.Counters;
1314
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
1415

@@ -31,57 +32,64 @@ public class MessageHandler implements DataManagementFifoCallback {
3132
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
3233

3334
private final IotHubAdapter iotHubAdapter;
35+
private final LoAdapter loAdapter;
3436
private final Counters counterProvider;
3537

36-
public MessageHandler(IotHubAdapter iotHubAdapter, Counters counterProvider) {
38+
public MessageHandler(IotHubAdapter iotHubAdapter, LoAdapter loAdapter, Counters counterProvider) {
3739
this.iotHubAdapter = iotHubAdapter;
40+
this.loAdapter = loAdapter;
3841
this.counterProvider = counterProvider;
3942
}
4043

41-
public void onMessage(String message) {
44+
@Override
45+
public void onMessage(int loMessageId, String message) {
4246
String messageType = getMessageType(message);
4347
LOG.info("Received message of the type: {}", messageType);
4448
switch (messageType) {
4549
case DATA_MESSAGE_TYPE:
46-
handleDataMessage(message);
50+
handleDataMessage(loMessageId, message);
4751
break;
4852
case DEVICE_CREATED_MESSAGE_TYPE:
49-
handleDeviceCreationEvent(message);
53+
handleDeviceCreationEvent(loMessageId, message);
5054
break;
5155
case DEVICE_DELETED_MESSAGE_TYPE:
52-
handleDeviceRemovalEvent(message);
56+
handleDeviceRemovalEvent(loMessageId, message);
5357
break;
5458
default:
5559
LOG.error("Unknown message type of message: {}", message);
5660
}
5761
}
5862

59-
private void handleDataMessage(String message) {
63+
private void handleDataMessage(int loMessageId, String message) {
6064
counterProvider.getMesasageReadCounter().increment();
6165
try {
6266
String sourceDeviceId = getSourceDeviceId(message);
63-
iotHubAdapter.sendMessage(sourceDeviceId, message);
67+
iotHubAdapter.sendMessage(sourceDeviceId, loMessageId, message);
6468
} catch (JSONException e) {
6569
LOG.error("Cannot read source device id from message", e);
6670
counterProvider.getMesasageSentFailedCounter().increment();
71+
loAdapter.sendMessageAck(loMessageId);
6772
} catch (DeviceSynchronizationException e) {
6873
LOG.error("Cannot send message to IoT Hub because device doesn't exist", e);
6974
counterProvider.getMesasageSentFailedCounter().increment();
75+
loAdapter.sendMessageAck(loMessageId);
7076
} catch (Exception e) {
7177
LOG.error("Cannot send message to IoT Hub", e);
7278
counterProvider.getMesasageSentFailedCounter().increment();
79+
loAdapter.sendMessageAck(loMessageId);
7380
}
7481
}
7582

76-
private void handleDeviceCreationEvent(String message) {
83+
private void handleDeviceCreationEvent(int loMessageId, String message) {
7784
Optional<String> deviceId = getDeviceId(message);
7885
deviceId.ifPresent(iotHubAdapter::createOrGetDeviceClientManager);
79-
86+
loAdapter.sendMessageAck(loMessageId);
8087
}
8188

82-
private void handleDeviceRemovalEvent(String message) {
89+
private void handleDeviceRemovalEvent(int loMessageId, String message) {
8390
Optional<String> deviceId = getDeviceId(message);
8491
deviceId.ifPresent(iotHubAdapter::deleteDevice);
92+
loAdapter.sendMessageAck(loMessageId);
8593
}
8694

8795
private static String getSourceDeviceId(String msg) throws JSONException {
@@ -105,4 +113,4 @@ private static String getMessageType(String msg) {
105113
return UNKNOWN_MESSAGE_TYPE;
106114
}
107115
}
108-
}
116+
}

src/main/java/com/orange/lo/sample/lo2iothub/azure/DeviceClientManager.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;
1313
import com.microsoft.azure.sdk.iot.device.transport.TransportException;
1414
import com.microsoft.azure.sdk.iot.service.registry.Device;
15+
import com.orange.lo.sample.lo2iothub.lo.LoAdapter;
1516
import com.orange.lo.sample.lo2iothub.lo.LoCommandSender;
1617
import com.orange.lo.sample.lo2iothub.utils.Counters;
1718
import net.jodah.failsafe.Failsafe;
@@ -35,12 +36,14 @@ public class DeviceClientManager implements MessageCallback, IotHubConnectionSta
3536
private final LoCommandSender loCommandSender;
3637
private MultiplexingClientManager multiplexingClientManager;
3738
private final AzureIotHubProperties azureIotHubProperties;
39+
private final LoAdapter loAdapter;
3840
private final IoTDeviceProvider ioTDeviceProvider;
3941

4042
private Counters counterProvider;
4143

42-
public DeviceClientManager(Device device, AzureIotHubProperties azureIotHubProperties, LoCommandSender loCommandSender, IoTDeviceProvider ioTDeviceProvider, Counters counterProvider) {
44+
public DeviceClientManager(Device device, AzureIotHubProperties azureIotHubProperties, LoCommandSender loCommandSender, LoAdapter loAdapter, IoTDeviceProvider ioTDeviceProvider, Counters counterProvider) {
4345
this.azureIotHubProperties = azureIotHubProperties;
46+
this.loAdapter = loAdapter;
4447
this.ioTDeviceProvider = ioTDeviceProvider;
4548
this.deviceClient = new DeviceClient(getConnectionString(device, azureIotHubProperties.getIotHostName()), IotHubClientProtocol.AMQPS);
4649
this.deviceClient.setMessageCallback(this, null);
@@ -77,9 +80,9 @@ public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChange
7780
}
7881
}
7982

80-
public void sendMessage(String loMessage) {
83+
public void sendMessage(int loMessageId, String loMessage) {
8184
Message message = new Message(loMessage);
82-
sendMessage(message);
85+
sendMessage(loMessageId, message);
8386
}
8487

8588
private void reestablishSessionAsync(Throwable throwable) {
@@ -114,7 +117,7 @@ private void reestablishSessionAsync(Throwable throwable) {
114117
}).start();
115118
}
116119

117-
private void sendMessage(Message message) {
120+
private void sendMessage(int loMessageId, Message message) {
118121
Failsafe.with(
119122
new RetryPolicy<IotHubStatusCode>()
120123
.withMaxAttempts(azureIotHubProperties.getMessageSendMaxAttempts())
@@ -129,10 +132,12 @@ private void sendMessage(Message message) {
129132
.onSuccess(r -> {
130133
LOG.debug("IoT Hub responded to message with id {} from {} with status {}", message.getMessageId(), getDeviceId(), r.getResult().name());
131134
counterProvider.getMesasageSentCounter().increment();
135+
loAdapter.sendMessageAck(loMessageId);
132136
})
133137
.onFailure(r -> {
134138
LOG.error("Cannot send message with id " + message.getMessageId() + " from " + getDeviceId(), r.getFailure());
135139
counterProvider.getMesasageSentFailedCounter().increment();
140+
loAdapter.sendMessageAck(loMessageId);
136141
})
137142
).getAsyncExecution(execution -> {
138143
counterProvider.getMesasageSentAttemptCounter().increment();

src/main/java/com/orange/lo/sample/lo2iothub/azure/DevicesManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
1111
import com.microsoft.azure.sdk.iot.service.registry.Device;
12+
import com.orange.lo.sample.lo2iothub.lo.LoAdapter;
1213
import com.orange.lo.sample.lo2iothub.lo.LoCommandSender;
1314

1415
import java.lang.invoke.MethodHandles;
@@ -36,6 +37,7 @@ public class DevicesManager {
3637
private final ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint;
3738
private final IoTDeviceProvider ioTDeviceProvider;
3839
private Counters counterProvider;
40+
private LoAdapter loAdapter;
3941

4042
public DevicesManager(AzureIotHubProperties azureIotHubProperties, ConnectorHealthActuatorEndpoint connectorHealthActuatorEndpoint, IoTDeviceProvider ioTDeviceProvider, Counters counterProvider) throws IotHubClientException {
4143
this.azureIotHubProperties = azureIotHubProperties;
@@ -49,6 +51,10 @@ public void setLoCommandSender(LoCommandSender loCommandSender) {
4951
this.loCommandSender = loCommandSender;
5052
}
5153

54+
public void setLoAdapter(LoAdapter loAdapter) {
55+
this.loAdapter = loAdapter;
56+
}
57+
5258
public synchronized boolean containsDeviceClient(String deviceClientId) {
5359
for (MultiplexingClientManager multiplexingClientManager : multiplexingClientManagerList) {
5460
if (multiplexingClientManager.deviceExisted(deviceClientId)) {
@@ -59,7 +65,7 @@ public synchronized boolean containsDeviceClient(String deviceClientId) {
5965
}
6066

6167
public void createDeviceClient(Device device) {
62-
DeviceClientManager deviceClientManager = new DeviceClientManager(device, azureIotHubProperties, loCommandSender, ioTDeviceProvider, counterProvider);
68+
DeviceClientManager deviceClientManager = new DeviceClientManager(device, azureIotHubProperties, loCommandSender, loAdapter, ioTDeviceProvider, counterProvider);
6369
MultiplexingClientManager freeMultiplexingClientManager = getFreeMultiplexingClientManager();
6470
deviceClientManager.setMultiplexingClientManager(freeMultiplexingClientManager);
6571
freeMultiplexingClientManager.registerDeviceClientManager(deviceClientManager);

src/main/java/com/orange/lo/sample/lo2iothub/azure/IotHubAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public IotHubAdapter(IoTDeviceProvider ioTDeviceProvider, DevicesManager deviceC
3636
this.connectorHealthActuatorEndpoint = connectorHealthActuatorEndpoint;
3737
}
3838

39-
public void sendMessage(String loClientId, String message) {
39+
public void sendMessage(String loClientId, int loMessageId, String message) {
4040
DeviceClientManager deviceClientManager = createOrGetDeviceClientManager(loClientId);
41-
deviceClientManager.sendMessage(message);
41+
deviceClientManager.sendMessage(loMessageId, message);
4242
}
4343

4444
public void deleteDevice(String deviceId) {

0 commit comments

Comments
 (0)