Skip to content

Commit 479f5b2

Browse files
committed
chore(componets): make TraceProcessor async
1 parent 7d46e88 commit 479f5b2

File tree

1 file changed

+47
-10
lines changed

1 file changed

+47
-10
lines changed

components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
*/
1717
package org.apache.camel.telemetry;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
21+
import org.apache.camel.AsyncCallback;
1922
import org.apache.camel.CamelContext;
2023
import org.apache.camel.Exchange;
2124
import org.apache.camel.NamedNode;
2225
import org.apache.camel.Processor;
2326
import org.apache.camel.spi.InterceptStrategy;
27+
import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
2428
import org.apache.camel.support.processor.DelegateAsyncProcessor;
2529

2630
/**
@@ -39,32 +43,65 @@ public Processor wrapProcessorInInterceptors(
3943
CamelContext camelContext,
4044
NamedNode processorDefinition, Processor target, Processor nextTarget)
4145
throws Exception {
42-
return new DelegateAsyncProcessor(new TraceProcessor(target, processorDefinition));
46+
return new TraceProcessor(target, processorDefinition);
4347
}
4448

45-
private class TraceProcessor implements Processor {
49+
private class TraceProcessor extends DelegateAsyncProcessor {
4650
private final NamedNode processorDefinition;
47-
private final Processor target;
4851

4952
public TraceProcessor(Processor target, NamedNode processorDefinition) {
50-
this.target = target;
53+
super(target);
5154
this.processorDefinition = processorDefinition;
5255
}
5356

5457
@Override
5558
public void process(Exchange exchange) throws Exception {
56-
String processor = processorDefinition.getId() + "-" + processorDefinition.getShortName();
57-
if (tracer.isTraceProcessors() && !tracer.exclude(processor, exchange.getContext())) {
58-
tracer.beginProcessorSpan(exchange, processor);
59+
String processorName = processorDefinition.getId() + "-" + processorDefinition.getShortName();
60+
if (tracer.isTraceProcessors() && !tracer.exclude(processorName, exchange.getContext())) {
61+
tracer.beginProcessorSpan(exchange, processorName);
5962
try {
60-
target.process(exchange);
63+
processor.process(exchange);
6164
} finally {
62-
tracer.endProcessorSpan(exchange, processor);
65+
tracer.endProcessorSpan(exchange, processorName);
6366
}
6467
} else {
6568
// We must always execute this
66-
target.process(exchange);
69+
processor.process(exchange);
70+
}
71+
}
72+
73+
@Override
74+
public boolean process(Exchange exchange, AsyncCallback callback) {
75+
String processorName = processorDefinition.getId() + "-" + processorDefinition.getShortName();
76+
boolean isTraceProcessor = tracer.isTraceProcessors() && !tracer.exclude(processorName, exchange.getContext());
77+
if (isTraceProcessor) {
78+
try {
79+
tracer.beginProcessorSpan(exchange, processorName);
80+
} catch (Exception e) {
81+
exchange.setException(e);
82+
}
6783
}
84+
return processor.process(exchange, doneSync -> {
85+
try {
86+
callback.done(doneSync);
87+
} finally {
88+
if (isTraceProcessor) {
89+
try {
90+
tracer.endProcessorSpan(exchange, processorName);
91+
} catch (Exception e) {
92+
exchange.setException(e);
93+
}
94+
}
95+
}
96+
});
97+
}
98+
99+
@Override
100+
public CompletableFuture<Exchange> processAsync(Exchange exchange) {
101+
AsyncCallbackToCompletableFutureAdapter<Exchange> callback
102+
= new AsyncCallbackToCompletableFutureAdapter<>(exchange);
103+
process(exchange, callback);
104+
return callback.getFuture();
68105
}
69106
}
70107

0 commit comments

Comments
 (0)