Skip to content

Commit f1c50b3

Browse files
gnodetclaude
andcommitted
Fix ContextValue scoping issues with KameletProcessor and JDK 25 compilation
The ContextValue abstraction properly scopes createRoute/createProcessor ThreadLocals, but KameletEndpoint.doInit() relied on a leaked ThreadLocal value from a bug in the old createRoute(null) which cleared isSetupRoutes instead of isCreateRoute. Fix by capturing the route/processor context in KameletReifier (within the createProcessor scope) and passing them to KameletProcessor, which restores them during doInit() so the endpoint can inherit error handlers correctly. Also fix JDK 25 ScopedValue.Carrier API: use call() instead of get(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5b8a237 commit f1c50b3

File tree

3 files changed

+36
-4
lines changed

3 files changed

+36
-4
lines changed

components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,25 @@ public class KameletProcessor extends BaseProcessorSupport
4343

4444
private final String name;
4545
private final AsyncProcessor processor;
46+
private final String parentRouteId;
47+
private final String parentProcessorId;
4648
private KameletProducer producer;
4749
private KameletComponent component;
4850
private CamelContext camelContext;
4951
private String id;
5052
private String routeId;
5153

5254
public KameletProcessor(CamelContext camelContext, String name, Processor processor) throws Exception {
55+
this(camelContext, name, processor, null, null);
56+
}
57+
58+
public KameletProcessor(CamelContext camelContext, String name, Processor processor,
59+
String parentRouteId, String parentProcessorId) throws Exception {
5360
this.camelContext = camelContext;
5461
this.name = name;
5562
this.processor = AsyncProcessorConverterHelper.convert(processor);
63+
this.parentRouteId = parentRouteId;
64+
this.parentProcessorId = parentProcessorId;
5665
}
5766

5867
@ManagedAttribute(description = "Kamelet name (templateId/routeId?options)")
@@ -118,7 +127,25 @@ public String getTraceLabel() {
118127
@Override
119128
protected void doInit() throws Exception {
120129
this.component = camelContext.getComponent("kamelet", KameletComponent.class);
121-
this.producer = (KameletProducer) camelContext.getEndpoint("kamelet://" + name).createAsyncProducer();
130+
131+
// set the route/processor context so the KameletEndpoint.doInit() can pick them up
132+
// these were captured during construction (within the createProcessor scope)
133+
if (parentRouteId != null) {
134+
camelContext.getCamelContextExtension().createRoute(parentRouteId);
135+
}
136+
if (parentProcessorId != null) {
137+
camelContext.getCamelContextExtension().createProcessor(parentProcessorId);
138+
}
139+
try {
140+
this.producer = (KameletProducer) camelContext.getEndpoint("kamelet://" + name).createAsyncProducer();
141+
} finally {
142+
if (parentRouteId != null) {
143+
camelContext.getCamelContextExtension().createRoute(null);
144+
}
145+
if (parentProcessorId != null) {
146+
camelContext.getCamelContextExtension().createProcessor(null);
147+
}
148+
}
122149

123150
ServiceHelper.initService(processor, producer);
124151

components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,14 @@ public Processor createProcessor() throws Exception {
4040
// wrap in uow
4141
String outputId = definition.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class));
4242
final Processor childProcessor = processor;
43+
// capture route/processor context now (within createProcessor scope) for later use by KameletProcessor
44+
final String parentRouteId = camelContext.getCamelContextExtension().getCreateRoute();
4345
return camelContext.getCamelContextExtension().createProcessor(outputId, () -> {
44-
Processor answer = new KameletProcessor(camelContext, parseString(definition.getName()), childProcessor);
46+
final String parentProcessorId = camelContext.getCamelContextExtension().getCreateProcessor();
47+
Processor answer
48+
= new KameletProcessor(
49+
camelContext, parseString(definition.getName()), childProcessor,
50+
parentRouteId, parentProcessorId);
4551
if (answer instanceof DisabledAware da) {
4652
da.setDisabled(isDisabled(camelContext, definition));
4753
}

core/camel-util/src/main/java25/org/apache/camel/util/concurrent/ContextValueFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ static <T> ContextValue<T> newThreadLocal(String name, Supplier<T> supplier) {
8181
*/
8282
static <T, R> R where(ContextValue<T> key, T value, Supplier<R> operation) {
8383
if (key instanceof ScopedValueContextValue<T> svKey) {
84-
// In JDK 25+, ScopedValue.where() returns a Carrier that has get() method
85-
return ScopedValue.where(svKey.scopedValue, value).get(operation);
84+
return ScopedValue.where(svKey.scopedValue, value).call(operation::get);
8685
} else if (key instanceof ThreadLocalContextValue<T> tlKey) {
8786
T oldValue = tlKey.get();
8887
try {

0 commit comments

Comments
 (0)