Skip to content

Commit b8ccc2a

Browse files
committed
CAMEL-22789: camel-core - Using bridgeErrorHandler=true can cause endless loop if triggered from onCompletion (such as camel-aws-s3) (#20476)
1 parent 951b79b commit b8ccc2a

File tree

2 files changed

+180
-10
lines changed

2 files changed

+180
-10
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.processor;
18+
19+
import java.util.Map;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.apache.camel.Component;
24+
import org.apache.camel.Consumer;
25+
import org.apache.camel.ContextTestSupport;
26+
import org.apache.camel.Endpoint;
27+
import org.apache.camel.Exchange;
28+
import org.apache.camel.Processor;
29+
import org.apache.camel.Producer;
30+
import org.apache.camel.builder.RouteBuilder;
31+
import org.apache.camel.support.DefaultComponent;
32+
import org.apache.camel.support.DefaultConsumer;
33+
import org.apache.camel.support.DefaultEndpoint;
34+
import org.apache.camel.support.SynchronizationAdapter;
35+
import org.junit.jupiter.api.Test;
36+
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.junit.jupiter.api.Assertions.assertNotNull;
39+
40+
/**
41+
*
42+
*/
43+
public class DefaultConsumerUoWBridgeErrorHandlerTest extends ContextTestSupport {
44+
45+
protected final CountDownLatch latch = new CountDownLatch(1);
46+
47+
@Test
48+
public void testDefaultConsumerBridgeErrorHandler() throws Exception {
49+
getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
50+
getMockEndpoint("mock:dead").expectedBodiesReceived("Cannot complete");
51+
52+
latch.countDown();
53+
54+
assertMockEndpointsSatisfied();
55+
56+
Exception cause = getMockEndpoint("mock:dead").getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT,
57+
Exception.class);
58+
assertNotNull(cause);
59+
assertEquals("Forced error", cause.getMessage());
60+
}
61+
62+
@Override
63+
protected RouteBuilder createRouteBuilder() {
64+
// START SNIPPET: e1
65+
return new RouteBuilder() {
66+
@Override
67+
public void configure() {
68+
// register our custom component
69+
getContext().addComponent("my", new MyComponent());
70+
71+
// configure error handler
72+
errorHandler(deadLetterChannel("mock:dead"));
73+
74+
// configure the consumer to bridge with the Camel error
75+
// handler,
76+
// so the above error handler will trigger if exceptions also
77+
// occurs inside the consumer
78+
from("my:foo?bridgeErrorHandler=true").to("log:foo").to("mock:result");
79+
}
80+
};
81+
// END SNIPPET: e1
82+
}
83+
84+
public class MyComponent extends DefaultComponent {
85+
86+
@Override
87+
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) {
88+
return new MyEndpoint(uri, this);
89+
}
90+
}
91+
92+
public class MyEndpoint extends DefaultEndpoint {
93+
94+
public MyEndpoint(String endpointUri, Component component) {
95+
super(endpointUri, component);
96+
}
97+
98+
@Override
99+
public Producer createProducer() {
100+
return null;
101+
}
102+
103+
@Override
104+
public Consumer createConsumer(Processor processor) throws Exception {
105+
Consumer answer = new MyConsumer(this, processor);
106+
configureConsumer(answer);
107+
return answer;
108+
}
109+
110+
@Override
111+
public boolean isSingleton() {
112+
return true;
113+
}
114+
}
115+
116+
public class MyConsumer extends DefaultConsumer {
117+
118+
private int invoked;
119+
120+
public MyConsumer(Endpoint endpoint, Processor processor) {
121+
super(endpoint, processor);
122+
}
123+
124+
public void doSomething() {
125+
try {
126+
Exchange exchange = getEndpoint().createExchange();
127+
exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
128+
@Override
129+
public void onComplete(Exchange exchange) {
130+
// trigger bridge error handler in UoW completion to test that this is not called again in endless loop
131+
getExceptionHandler().handleException("Cannot complete", exchange,
132+
new IllegalArgumentException("Forced error"));
133+
}
134+
});
135+
exchange.getIn().setBody("Hello World");
136+
getProcessor().process(exchange);
137+
138+
} catch (Exception e) {
139+
getExceptionHandler().handleException("Cannot process", e);
140+
}
141+
}
142+
143+
@Override
144+
protected void doStart() throws Exception {
145+
super.doStart();
146+
147+
Thread thread = new Thread() {
148+
@Override
149+
public void run() {
150+
try {
151+
// do not start before the mocks has been setup and is
152+
// ready
153+
latch.await(5, TimeUnit.SECONDS);
154+
doSomething();
155+
} catch (Exception e) {
156+
// ignore
157+
}
158+
}
159+
};
160+
thread.start();
161+
}
162+
}
163+
}

core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,29 +58,36 @@ public void handleException(String message, Throwable exception) {
5858

5959
@Override
6060
public void handleException(String message, Exchange exchange, Throwable exception) {
61+
Exchange copy;
6162
if (exchange == null) {
62-
exchange = consumer.getEndpoint().createExchange();
63+
copy = consumer.getEndpoint().createExchange();
64+
} else {
65+
// use a copy to as must be processed independently unit of work
66+
copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
6367
}
6468

6569
// set the caused exception
66-
exchange.setException(exception);
67-
exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, exception);
70+
copy.setException(exception);
71+
copy.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, exception);
6872
// and the message
69-
exchange.getIn().setBody(message);
73+
copy.getIn().setBody(message);
7074
// mark as bridged
71-
exchange.setProperty(ExchangePropertyKey.ERRORHANDLER_BRIDGE, true);
75+
copy.setProperty(ExchangePropertyKey.ERRORHANDLER_BRIDGE, true);
7276
// and mark as redelivery exhausted as we cannot do redeliveries
73-
exchange.getExchangeExtension().setRedeliveryExhausted(true);
77+
copy.getExchangeExtension().setRedeliveryExhausted(true);
7478

7579
// wrap in UoW
7680
UnitOfWork uow = null;
7781
try {
78-
uow = consumer.createUoW(exchange);
79-
bridge.process(exchange);
82+
uow = consumer.createUoW(copy);
83+
// process synchronously
84+
bridge.process(copy);
8085
} catch (Exception e) {
81-
fallback.handleException("Error handling exception " + exception.getMessage(), exchange, e);
86+
fallback.handleException(
87+
"Error bridge handling existing exception " + exception.getMessage() + " due to: " + e.getMessage(), copy,
88+
e);
8289
} finally {
83-
UnitOfWorkHelper.doneUow(uow, exchange);
90+
UnitOfWorkHelper.doneUow(uow, copy);
8491
}
8592
}
8693
}

0 commit comments

Comments
 (0)