11package io .opentelemetry .contrib .messaging .wrappers ;
22
33import io .opentelemetry .api .OpenTelemetry ;
4+ import io .opentelemetry .api .common .Attributes ;
5+ import io .opentelemetry .api .common .AttributesBuilder ;
46import io .opentelemetry .api .trace .Span ;
57import io .opentelemetry .api .trace .SpanBuilder ;
68import io .opentelemetry .api .trace .Tracer ;
911import io .opentelemetry .context .propagation .TextMapGetter ;
1012import io .opentelemetry .context .propagation .TextMapPropagator ;
1113import io .opentelemetry .contrib .messaging .wrappers .semconv .MessagingProcessRequest ;
12- import io .opentelemetry .contrib . messaging . wrappers . semconv . MessagingProcessResponse ;
14+ import io .opentelemetry .instrumentation . api . instrumenter . AttributesExtractor ;
1315
16+ import javax .annotation .Nullable ;
1417import java .util .List ;
15- import java .util .concurrent .Callable ;
16- import java .util .logging .Level ;
17- import java .util .logging .Logger ;
1818
19- public class MessagingProcessWrapper <REQUEST extends MessagingProcessRequest , RESPONSE extends MessagingProcessResponse <?>> {
20-
21- private static final Logger LOG = Logger .getLogger (MessagingProcessWrapper .class .getName ());
19+ public class MessagingProcessWrapper <REQUEST extends MessagingProcessRequest > {
2220
2321 private static final String INSTRUMENTATION_SCOPE = "messaging-process-wrapper" ;
2422
@@ -32,94 +30,57 @@ public class MessagingProcessWrapper<REQUEST extends MessagingProcessRequest, RE
3230
3331 private final TextMapGetter <REQUEST > textMapGetter ;
3432
35- private final List <MessagingSpanCustomizer <REQUEST , RESPONSE >> spanCustomizers ;
33+ // no attributes need to be extracted from responses in process operations
34+ private final List <AttributesExtractor <REQUEST , Void >> attributesExtractors ;
3635
37- public Runnable wrap (REQUEST request , Runnable runnable ) {
38- return () -> {
39- Span span = handleStart (request );
40- Scope scope = span .makeCurrent ();
36+ public static <REQUEST extends MessagingProcessRequest > DefaultMessagingProcessWrapperBuilder <REQUEST > defaultBuilder () {
37+ return new DefaultMessagingProcessWrapperBuilder <>();
38+ }
4139
42- try {
43- runnable .run ();
44- } catch (Throwable t ) {
45- handleEnd (span , request , null , t );
46- scope .close ();
47- throw t ;
48- }
40+ public <E extends Throwable > void doProcess (REQUEST request , ThrowingRunnable <E > runnable ) throws E {
41+ Span span = handleStart (request );
4942
50- handleEnd (span , request , null , null );
51- scope .close ();
52- };
53- }
43+ try (Scope scope = span .makeCurrent ()) {
44+ runnable .run ();
45+ } catch (Throwable t ) {
46+ handleEnd (span , request , t );
47+ throw t ;
48+ }
5449
55- public <R > Callable <R > wrap (REQUEST request , Callable <R > callable ) {
56- return () -> {
57- Span span = handleStart (request );
58- Scope scope = span .makeCurrent ();
59- RESPONSE response = null ;
60-
61- R result = null ;
62- try {
63- result = callable .call ();
64- if (result instanceof MessagingProcessResponse ) {
65- response = (RESPONSE ) result ;
66- }
67- } catch (Throwable t ) {
68- handleEnd (span , request , response , t );
69- scope .close ();
70- throw t ;
71- }
72-
73- handleEnd (span , request , response , null );
74- scope .close ();
75- return result ;
76- };
50+ handleEnd (span , request , null );
7751 }
7852
79- public <R > R doProcess (REQUEST request , Callable < R > process ) throws Exception {
53+ public <R , E extends Throwable > R doProcess (REQUEST request , ThrowingSupplier < R , E > supplier ) throws E {
8054 Span span = handleStart (request );
81- Scope scope = span .makeCurrent ();
82- RESPONSE response = null ;
8355
8456 R result = null ;
85- try {
86- result = process .call ();
87- if (result instanceof MessagingProcessResponse ) {
88- response = (RESPONSE ) result ;
89- }
57+ try (Scope scope = span .makeCurrent ()) {
58+ result = supplier .get ();
9059 } catch (Throwable t ) {
91- handleEnd (span , request , response , t );
92- scope .close ();
60+ handleEnd (span , request , t );
9361 throw t ;
9462 }
9563
96- // noop response by default
97- handleEnd (span , request , response , null );
98- scope .close ();
64+ handleEnd (span , request , null );
9965 return result ;
10066 }
10167
10268 protected Span handleStart (REQUEST request ) {
10369 Context context = this .textMapPropagator .extract (Context .current (), request , this .textMapGetter );
10470 SpanBuilder spanBuilder = this .tracer .spanBuilder (getDefaultSpanName (request .getDestination ()));
10571 spanBuilder .setParent (context );
106- for (MessagingSpanCustomizer <REQUEST , RESPONSE > customizer : spanCustomizers ) {
107- try {
108- context = customizer .onStart (spanBuilder , context , request );
109- } catch (Exception e ) {
110- LOG .log (Level .WARNING , "Exception occurred while customizing span on start." , e );
111- }
72+
73+ AttributesBuilder builder = Attributes .builder ();
74+ for (AttributesExtractor <REQUEST , Void > extractor : this .attributesExtractors ) {
75+ extractor .onStart (builder , context , request );
11276 }
113- return spanBuilder .startSpan ();
77+ return spanBuilder .setAllAttributes ( builder . build ()). startSpan ();
11478 }
11579
116- protected void handleEnd (Span span , REQUEST request , RESPONSE response , Throwable t ) {
117- for (MessagingSpanCustomizer <REQUEST , RESPONSE > customizer : spanCustomizers ) {
118- try {
119- customizer .onEnd (span , Context .current (), request , response , t );
120- } catch (Exception e ) {
121- LOG .log (Level .WARNING , "Exception occurred while customizing span on end." , e );
122- }
80+ protected void handleEnd (Span span , REQUEST request , Throwable t ) {
81+ AttributesBuilder builder = Attributes .builder ();
82+ for (AttributesExtractor <REQUEST , Void > extractor : this .attributesExtractors ) {
83+ extractor .onEnd (builder , Context .current (), request , null , t );
12384 }
12485 span .end ();
12586 }
@@ -132,11 +93,11 @@ protected String getDefaultSpanName(String destination) {
13293 }
13394
13495 protected MessagingProcessWrapper (OpenTelemetry openTelemetry ,
135- TextMapGetter <REQUEST > textMapGetter ,
136- List <MessagingSpanCustomizer <REQUEST , RESPONSE >> spanCustomizers ) {
96+ @ Nullable TextMapGetter <REQUEST > textMapGetter ,
97+ List <AttributesExtractor <REQUEST , Void >> attributesExtractors ) {
13798 this .textMapPropagator = openTelemetry .getPropagators ().getTextMapPropagator ();
13899 this .tracer = openTelemetry .getTracer (INSTRUMENTATION_SCOPE + "-" + INSTRUMENTATION_VERSION );
139100 this .textMapGetter = textMapGetter ;
140- this .spanCustomizers = spanCustomizers ;
101+ this .attributesExtractors = attributesExtractors ;
141102 }
142103}
0 commit comments