Skip to content

Commit 18dc5a4

Browse files
authored
ensures span is restored back after onComplete is propagated to the downstream (#2275)
1 parent 7032440 commit 18dc5a4

File tree

2 files changed

+59
-6
lines changed
  • spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web
  • tests/brave/spring-cloud-sleuth-instrumentation-webflux-tests/src/test/java/org/springframework/cloud/sleuth/brave/instrument/web

2 files changed

+59
-6
lines changed

spring-cloud-sleuth-instrumentation/src/main/java/org/springframework/cloud/sleuth/instrument/web/TraceWebFilter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,22 +274,24 @@ public void onNext(Void aVoid) {
274274

275275
@Override
276276
public void onError(Throwable t) {
277-
terminateSpan(t);
278-
this.actual.onError(t);
277+
try (Tracer.SpanInScope ignored = terminateSpan(t)) {
278+
this.actual.onError(t);
279+
}
279280
}
280281

281282
@Override
282283
public void onComplete() {
283-
terminateSpan(null);
284-
this.actual.onComplete();
284+
try (Tracer.SpanInScope ignored = terminateSpan(null)) {
285+
this.actual.onComplete();
286+
}
285287
}
286288

287289
@Override
288290
public Context currentContext() {
289291
return this.context;
290292
}
291293

292-
private void terminateSpan(@Nullable Throwable t) {
294+
private Tracer.SpanInScope terminateSpan(@Nullable Throwable t) {
293295
Object attribute = this.exchange.getAttribute(HandlerMapping.BEST_MATCHING_HANDLER_ATTRIBUTE);
294296
addClassMethodTag(attribute, this.span);
295297
addClassNameTag(attribute, this.span);
@@ -302,7 +304,7 @@ private void terminateSpan(@Nullable Throwable t) {
302304
if (log.isDebugEnabled()) {
303305
log.debug("Handled send of " + this.span);
304306
}
305-
tracer.withSpan(null);
307+
return tracer.withSpan(null);
306308
}
307309

308310
private void addClassMethodTag(Object handler, Span span) {

tests/brave/spring-cloud-sleuth-instrumentation-webflux-tests/src/test/java/org/springframework/cloud/sleuth/brave/instrument/web/TraceWebFluxTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public void should_instrument_web_filter() throws Exception {
7070
.run();
7171
TestSpanHandler spans = context.getBean(TestSpanHandler.class);
7272
AssertingWebFilter assertingWebFilter = context.getBean(AssertingWebFilter.class);
73+
InnerAssertingWebFilter innerAssertingWebFilter = context.getBean(InnerAssertingWebFilter.class);
7374
int port = context.getBean(Environment.class).getProperty("local.server.port", Integer.class);
7475
Controller2 controller2 = context.getBean(Controller2.class);
7576
clean(spans, controller2);
@@ -78,6 +79,7 @@ public void should_instrument_web_filter() throws Exception {
7879
ClientResponse response = whenRequestIsSent(port, "/api/c2/10");
7980
// then
8081
thenSpanWasReportedWithTags(spans, response);
82+
thenTraceWasAvailableInDoAfter(innerAssertingWebFilter, spans.spans().get(0).id());
8183
// then #2002
8284
then(response.headers().header("mytraceid")).isNotEmpty();
8385
clean(spans, controller2);
@@ -86,35 +88,41 @@ public void should_instrument_web_filter() throws Exception {
8688
response = whenRequestIsSent(port, "/api/fn/20");
8789
// then
8890
thenFunctionalSpanWasReportedWithTags(spans, response);
91+
thenTraceWasAvailableInDoAfter(innerAssertingWebFilter, spans.spans().get(0).id());
8992
spans.clear();
9093

9194
// when
9295
response = whenRequestIsSent(port, "/missing-endpoint");
9396
// then
9497
thenSpanWith404StatusCodeWasReported(spans, response);
98+
thenTraceWasAvailableInDoAfter(innerAssertingWebFilter, spans.spans().get(0).id());
9599
spans.clear();
96100

97101
// when
98102
response = whenRequestIsSent(port, "/exception");
99103
// then
100104
thenSpanWithExceptionWasReported(spans, response);
105+
thenTraceWasAvailableInDoAfter(innerAssertingWebFilter, spans.spans().get(0).id());
101106
spans.clear();
102107

103108
// when
104109
ClientResponse nonSampledResponse = whenNonSampledRequestIsSent(port);
105110
// then
106111
thenNoSpanWasReported(spans, nonSampledResponse, controller2);
112+
thenSomeTraceWasAvailableInDoAfter(innerAssertingWebFilter);
107113
spans.clear();
108114

109115
// when
110116
ClientResponse skippedPatternResponse = whenRequestIsSentToSkippedPattern(port);
111117
// then
112118
thenNoSpanWasReported(spans, skippedPatternResponse, controller2);
119+
thenSomeTraceWasAvailableInDoAfter(innerAssertingWebFilter);
113120

114121
// when (issue #1683)
115122
response = whenRequestWithXForwardedForIsSent(port, "/api/fn/20");
116123
// then
117124
thenSpanWasReportedWithRemoteIpTags(spans, response);
125+
thenTraceWasAvailableInDoAfter(innerAssertingWebFilter, spans.spans().get(0).id());
118126

119127
thenNoTraceWasLeaked(assertingWebFilter);
120128

@@ -166,6 +174,17 @@ private void thenNoTraceWasLeaked(AssertingWebFilter assertingWebFilter) {
166174
then(assertingWebFilter.getSpans()).isEmpty();
167175
}
168176

177+
178+
private void thenSomeTraceWasAvailableInDoAfter(InnerAssertingWebFilter assertingWebFilter) {
179+
then(assertingWebFilter.getSpans()).hasSize(1);
180+
assertingWebFilter.getSpans().clear();
181+
}
182+
183+
private void thenTraceWasAvailableInDoAfter(InnerAssertingWebFilter assertingWebFilter, String spanId) {
184+
then(assertingWebFilter.getSpans()).hasSize(1).first().matches(s -> s.context().traceIdString().equals(spanId));
185+
assertingWebFilter.getSpans().clear();
186+
}
187+
169188
private void thenNoSpanWasReported(TestSpanHandler spans, ClientResponse response, Controller2 controller2) {
170189
Awaitility.await().untilAsserted(() -> {
171190
then(response.statusCode().value()).isEqualTo(200);
@@ -247,6 +266,38 @@ AssertingWebFilter traceIdInResponseLastFilter(Tracer tracer) {
247266
return new AssertingWebFilter(tracer);
248267
}
249268

269+
270+
@Bean
271+
@Order(Ordered.LOWEST_PRECEDENCE)
272+
WebFilter innerFilterDoAfterTerminate(Tracer tracer) {
273+
return new InnerAssertingWebFilter(tracer);
274+
}
275+
276+
}
277+
278+
static class InnerAssertingWebFilter implements WebFilter {
279+
final Queue<Span> spans = new ConcurrentLinkedQueue<>();
280+
281+
private final Tracer tracer;
282+
283+
InnerAssertingWebFilter(Tracer tracer) {
284+
this.tracer = tracer;
285+
}
286+
287+
@Override
288+
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain webFilterChain) {
289+
return webFilterChain.filter(exchange)
290+
.doAfterTerminate(() -> {
291+
Span currentSpan = tracer.currentSpan();
292+
if (currentSpan != null) {
293+
spans.add(currentSpan);
294+
}
295+
});
296+
}
297+
298+
Queue<Span> getSpans() {
299+
return spans;
300+
}
250301
}
251302

252303
static class AssertingWebFilter implements WebFilter {

0 commit comments

Comments
 (0)