Skip to content

Commit db85e32

Browse files
authored
Merge pull request #177 from cinovo/messageretryfix
Messageretry fix
2 parents 181b79a + b6dcdf0 commit db85e32

File tree

6 files changed

+63
-64
lines changed

6 files changed

+63
-64
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* Swagger 2.2.40
1818
* Velocity Engine 2.4.1
1919
* Fixing DaemonExceptionMapper
20+
* Fixing problem with retry messages in Interconnect
2021
* Better error handling in CloudconductorPropertyProvider
2122
* Fixed vulnerabilities: CVE-2024-13009(Jetty), CVE-2025-23184(Apache CXF), CVE-2024-57699 (Json-smart),CVE-2025-27533 (ActiveMQ)
2223

interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/handler/ADaemonMessageHandler.java

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,18 @@ private Message decryptIfNecessary(ICryptoService cryptoService, Message message
149149

150150
private void handleWithReply(IDaemonHandler handler, InterconnectResponseContext context) throws Exception {
151151
try {
152-
final InterconnectObject responseIco = this.handleRequest(handler, context.getCreateResponseMethod(),
153-
context.getReceivedContext().getRequestIco());
152+
final InterconnectObject responseIco = this.handleRequest(handler, context.getCreateResponseMethod(), context.getReceivedContext().getRequestIco());
154153
context.setResponseICO(responseIco);
155154
if (this.duration(context) == HandlingDurationType.TIMEOUT) {
156155
return;
157156
}
158157
this.reply(context);
159158
} catch (final DaemonError e) {
160-
this.getLogger().debug("DaemonError for " + context.getCreateResponseMethod().getMethod().getName() + "(" +
161-
context.getReceivedContext().getIcoClass().getSimpleName() + ")" + " with " +
162-
de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e);
159+
if (e.getNumber().get() < 0) {
160+
this.getLogger().error("DaemonError for " + context.getCreateResponseMethod().getMethod().getName() + "(" + context.getReceivedContext().getIcoClass().getSimpleName() + ")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e);
161+
} else {
162+
this.getLogger().debug("DaemonError for " + context.getCreateResponseMethod().getMethod().getName() + "(" + context.getReceivedContext().getIcoClass().getSimpleName() + ")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e);
163+
}
163164
this.sendErrorResponse(e, context);
164165
}
165166
}
@@ -175,13 +176,9 @@ private void sendErrorResponse(DaemonError e, InterconnectResponseContext contex
175176
}
176177

177178
private void updateThreadContext(InterconnectResponseContext context) throws Exception {
178-
de.taimos.dvalin.interconnect.model.InterconnectContext.setUuid(
179-
ADaemonMessageHandler.getUuid(context.getReceivedMessage(),
180-
context.getReceivedContext().getIcoClass()));
181-
de.taimos.dvalin.interconnect.model.InterconnectContext.setDeliveryCount(
182-
this.getDeliveryCount(context.getReceivedMessage()));
183-
de.taimos.dvalin.interconnect.model.InterconnectContext.setRedelivered(
184-
context.getReceivedMessage().getJMSRedelivered());
179+
de.taimos.dvalin.interconnect.model.InterconnectContext.setUuid(ADaemonMessageHandler.getUuid(context.getReceivedMessage(), context.getReceivedContext().getIcoClass()));
180+
de.taimos.dvalin.interconnect.model.InterconnectContext.setDeliveryCount(this.getDeliveryCount(context.getReceivedMessage()));
181+
de.taimos.dvalin.interconnect.model.InterconnectContext.setRedelivered(context.getReceivedMessage().getJMSRedelivered());
185182
Class<? extends IVO> ivoClass;
186183
if (context.getReceivedContext() instanceof IVO) {
187184
ivoClass = ADaemonMessageHandler.uncheckedCast(context.getResponseICO());
@@ -192,20 +189,15 @@ private void updateThreadContext(InterconnectResponseContext context) throws Exc
192189
private static DaemonMethod getDaemonMethod(RegistryEntry registryEntry, InterconnectResponseContext context) throws Exception {
193190
final DaemonMethod method = registryEntry.getMethod();
194191
if (method.isSecure() != context.getReceivedContext().isSecure()) {
195-
throw new Exception(
196-
"Insecure call (is " + context.getReceivedContext().isSecure() + " should be " + method.isSecure() +
197-
") for " + context.getReceivedContext().getIcoClass().getSimpleName() + " from " +
198-
context.getReceivedMessage().getJMSReplyTo());
192+
throw new Exception("Insecure call (is " + context.getReceivedContext().isSecure() + " should be " + method.isSecure() + ") for " + context.getReceivedContext().getIcoClass().getSimpleName() + " from " + context.getReceivedMessage().getJMSReplyTo());
199193
}
200194
return method;
201195
}
202196

203197
private RegistryEntry getRegistryEntry(InterconnectResponseContext context) throws Exception {
204198
final RegistryEntry registryEntry = this.registry.get(context.getReceivedContext().getIcoClass());
205199
if (registryEntry == null) {
206-
throw new Exception(
207-
"No registered method found for " + context.getReceivedContext().getIcoClass().getSimpleName() +
208-
" from " + context.getReceivedMessage().getJMSReplyTo());
200+
throw new Exception("No registered method found for " + context.getReceivedContext().getIcoClass().getSimpleName() + " from " + context.getReceivedMessage().getJMSReplyTo());
209201
}
210202
return registryEntry;
211203
}
@@ -241,8 +233,7 @@ private void logInvoke(InterconnectResponseContext context) {
241233
.append("(").append(context.getReceivedContext().getIcoClass().getSimpleName()).append(")");
242234
if (context.getReceivedContext().getRequestIco() instanceof IPageable) {
243235
sbInvokeLog //
244-
.append(" at Page ").append(((IPageable) context.getReceivedContext().getRequestIco()).getOffset())
245-
.append(";").append(((IPageable) context.getReceivedContext().getRequestIco()).getLimit());
236+
.append(" at Page ").append(((IPageable) context.getReceivedContext().getRequestIco()).getOffset()).append(";").append(((IPageable) context.getReceivedContext().getRequestIco()).getLimit());
246237
}
247238
sbInvokeLog.append(" with ").append(de.taimos.dvalin.interconnect.model.InterconnectContext.getContext());
248239
this.getLogger().info(sbInvokeLog.toString());
@@ -267,15 +258,12 @@ private int getDeliveryCount(Message message) throws JMSException {
267258
private static UUID getUuid(Message message, Class<? extends InterconnectObject> icoClass) throws Exception {
268259
final String requestUUID = message.getStringProperty(InterconnectContext.HEADER_REQUEST_UUID);
269260
if (requestUUID == null) {
270-
throw new Exception("No request UUID found in message with " + icoClass.getSimpleName() + " from " +
271-
message.getJMSReplyTo());
261+
throw new Exception("No request UUID found in message with " + icoClass.getSimpleName() + " from " + message.getJMSReplyTo());
272262
}
273263
try {
274264
return UUID.fromString(requestUUID);
275265
} catch (final IllegalArgumentException e) {
276-
throw new Exception(
277-
"No valid request UUID " + requestUUID + " message with " + icoClass.getSimpleName() + " from " +
278-
message.getJMSReplyTo());
266+
throw new Exception("No valid request UUID " + requestUUID + " message with " + icoClass.getSimpleName() + " from " + message.getJMSReplyTo());
279267
}
280268
}
281269

@@ -314,9 +302,7 @@ private InterconnectObject handleRequest(final IDaemonHandler handler, final Dae
314302
throw new IdemponentRetryException(targetException);
315303
}
316304

317-
this.getLogger().error(
318-
"Exception in non-idempotent " + method.getMethod().getName() + "(" + ico.getClass().getSimpleName() +
319-
")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e);
305+
this.getLogger().error("Exception in non-idempotent " + method.getMethod().getName() + "(" + ico.getClass().getSimpleName() + ")" + " with " + de.taimos.dvalin.interconnect.model.InterconnectContext.getContext(), e);
320306
throw new DaemonError(FrameworkErrors.FRAMEWORK_ERROR, targetException);
321307
} catch (final Exception e) {
322308
throw new RuntimeException(e);

interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/jms/InterconnectMessageSender.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import de.taimos.dvalin.interconnect.core.daemon.exceptions.UnexpectedTypeException;
2727
import de.taimos.dvalin.interconnect.core.daemon.model.InterconnectContext;
2828
import de.taimos.dvalin.interconnect.core.daemon.util.DaemonExceptionMapper;
29+
import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException;
30+
import de.taimos.dvalin.interconnect.core.exceptions.SerializationException;
31+
import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException;
2932
import de.taimos.dvalin.interconnect.model.FutureImpl;
3033
import de.taimos.dvalin.interconnect.model.InterconnectList;
3134
import de.taimos.dvalin.interconnect.model.InterconnectMapper;
@@ -35,11 +38,10 @@
3538
import de.taimos.dvalin.interconnect.model.service.DaemonErrorNumber;
3639
import de.taimos.dvalin.interconnect.model.service.DaemonScanner;
3740
import de.taimos.dvalin.jms.IJmsConnector;
41+
import de.taimos.dvalin.jms.exceptions.CommunicationFailureException;
42+
import de.taimos.dvalin.jms.exceptions.CommunicationFailureException.CommunicationError;
3843
import de.taimos.dvalin.jms.exceptions.CreationException;
3944
import de.taimos.dvalin.jms.exceptions.CreationException.Source;
40-
import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException;
41-
import de.taimos.dvalin.interconnect.core.exceptions.SerializationException;
42-
import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException;
4345
import de.taimos.dvalin.jms.model.JmsResponseContext;
4446
import org.slf4j.Logger;
4547
import org.slf4j.LoggerFactory;
@@ -100,13 +102,26 @@ public void sendRequest(InterconnectContext interconnectContext) throws DaemonEr
100102

101103
private boolean checkForRetry(InterconnectContext so, InfrastructureException e) throws TimeoutException {
102104
if (!so.isIdempotent()) {
105+
this.logger.warn("No message retry due to missing idempotency.");
103106
return false;
104107
}
105-
if (!(e instanceof CreationException || e instanceof TimeoutException)) {
106-
return false;
108+
109+
if (e instanceof TimeoutException) {
110+
return true;
107111
}
108-
return !(e instanceof CreationException) ||
109-
Source.DESTINATION.equals(((CreationException) e).getExceptionSource());
112+
113+
if (e instanceof CreationException) {
114+
return Source.DESTINATION.equals(((CreationException) e).getExceptionSource());
115+
}
116+
117+
if (e instanceof CommunicationFailureException) {
118+
if (CommunicationError.SEND.equals(((CommunicationFailureException) e).getCommunicationError())) {
119+
this.logger.warn("Retrying message send because of: {}", FrameworkErrors.SEND_ERROR);
120+
return true;
121+
}
122+
}
123+
124+
return false;
110125
}
111126

112127
private void sendRequestRetry(InterconnectContext so) throws DaemonError, TimeoutException {
@@ -162,10 +177,12 @@ private <R> R request(InterconnectContext requestObject, Class<R> responseClazz)
162177
try {
163178
responseObject = this.jmsConnector.request(requestObject);
164179
} catch (SerializationException e) {
165-
throw new RuntimeException(e);
180+
DaemonExceptionMapper.mapAndThrow(e);
166181
} catch (InfrastructureException e) {
167182
if (this.checkForRetry(requestObject, e)) {
168183
responseObject = this.sendSyncRequestRetry(requestObject);
184+
} else {
185+
DaemonExceptionMapper.mapAndThrow(e);
169186
}
170187
}
171188

interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/model/InterconnectContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package de.taimos.dvalin.interconnect.core.daemon.model;
22

33
import com.google.common.base.Preconditions;
4+
import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException;
45
import de.taimos.dvalin.interconnect.model.InterconnectMapper;
56
import de.taimos.dvalin.interconnect.model.InterconnectObject;
6-
import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException;
77
import de.taimos.dvalin.jms.model.JmsContext;
88
import de.taimos.dvalin.jms.model.JmsTarget;
99

@@ -150,13 +150,13 @@ public InterconnectContext createResponseContext(InterconnectObject responseICO)
150150
return new InterconnectContextBuilder(this)//
151151
.withDestination(this.getReplyToDestination()) //
152152
.withRequestICO(responseICO) //
153-
.withIdempotent(false) //
153+
.withIdempotent(true) //
154154
.withTarget(JmsTarget.DESTINATION).build();
155155
}
156156
return new InterconnectContextBuilder(this)//
157157
.withDestinationName(this.getReplyToQueueName()) //
158158
.withRequestICO(responseICO) //
159-
.withIdempotent(false) //
159+
.withIdempotent(true) //
160160
.withTarget(JmsTarget.QUEUE).build();
161161
}
162162
}

interconnect/core/src/main/java/de/taimos/dvalin/interconnect/core/daemon/util/DaemonExceptionMapper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22

33
import de.taimos.dvalin.interconnect.core.daemon.exceptions.FrameworkErrors;
44
import de.taimos.dvalin.interconnect.core.daemon.exceptions.UnexpectedTypeException;
5+
import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException;
6+
import de.taimos.dvalin.interconnect.core.exceptions.MessageCryptoException;
7+
import de.taimos.dvalin.interconnect.core.exceptions.SerializationException;
8+
import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException;
59
import de.taimos.dvalin.interconnect.model.service.DaemonError;
610
import de.taimos.dvalin.interconnect.model.service.DaemonErrorNumber;
711
import de.taimos.dvalin.jms.exceptions.CommunicationFailureException;
812
import de.taimos.dvalin.jms.exceptions.CommunicationFailureException.CommunicationError;
913
import de.taimos.dvalin.jms.exceptions.CreationException;
1014
import de.taimos.dvalin.jms.exceptions.CreationException.Source;
11-
import de.taimos.dvalin.interconnect.core.exceptions.InfrastructureException;
12-
import de.taimos.dvalin.interconnect.core.exceptions.MessageCryptoException;
13-
import de.taimos.dvalin.interconnect.core.exceptions.SerializationException;
14-
import de.taimos.dvalin.interconnect.core.exceptions.TimeoutException;
1515

1616
/**
1717
* Copyright 2024 Cinovo AG<br>
@@ -77,10 +77,10 @@ private static Exception handleInfrastructureException(InfrastructureException e
7777

7878
private static Exception handleCommunicationFailureException(CommunicationFailureException e) {
7979
if (CommunicationError.SEND.equals(e.getCommunicationError())) {
80-
return new DaemonError(FrameworkErrors.SEND_ERROR, e);
80+
return new DaemonError(FrameworkErrors.SEND_ERROR, e.getCause());
8181
}
8282
if (CommunicationError.RECEIVE.equals(e.getCommunicationError())) {
83-
return new DaemonError(FrameworkErrors.RECEIVE_ERROR, e);
83+
return new DaemonError(FrameworkErrors.RECEIVE_ERROR, e.getCause());
8484
}
8585
if (CommunicationError.INVALID_RESPONSE.equals(e.getCommunicationError())) {
8686
return new DaemonError(FrameworkErrors.INVALID_RESPONSE_ERROR, e);

0 commit comments

Comments
 (0)