Skip to content

Commit 6082863

Browse files
Add reactor samples and doc (#7906)
* Add reactor samples and doc * Apply suggestions from code review Co-authored-by: Bruce Bujon <[email protected]> * review --------- Co-authored-by: Bruce Bujon <[email protected]>
1 parent f0b7459 commit 6082863

File tree

6 files changed

+319
-0
lines changed

6 files changed

+319
-0
lines changed

docs/reactor/README.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Reactor with dd-trace-java
2+
3+
This project shows basic examples of manual tracing with reactor and the datadog java tracer.
4+
The examples are provided in the form of JUnit tests. Manual tracing is achieved using the OpenTelemetry APIs.
5+
6+
## Get started
7+
8+
The project is configured to run with a JDK 17 and Apache Maven.
9+
To get started on a command line, just type `mvn test`. It will generate and log traces in a json format on the
10+
console.
11+
12+
## How the context is propagated
13+
14+
The Reactor context propagates bottom up from the first subscriber (last publisher) to the last subscriber (first publisher).
15+
The datadog tracer captures the active span when the subscription happens (i.e. methods like `subcribe` or `block` are called)
16+
and activates them when a publisher emits if there is no an already active span.
17+
18+
This works out of the box for bottom-up propagation. For the opposite, top-down, the reactor context has to be used in order
19+
to let know the publisher which span needs to be activated when emitting downstream. The span has to be
20+
added to the context using the key `dd.span`.
21+
22+
## Use cases
23+
24+
The use cases are reflecting the tests in this project
25+
26+
### Standard bottom-up context propagation
27+
28+
The sample leverages the `@Trace` annotation to create a span when a method returning a `Mono` or `Flux` is called.
29+
The annotation is available as part of the `dd-trace-api` library. Alternatively, the OpenTelemetry equivalent `@WithSpan` can
30+
also be used.
31+
32+
```java
33+
@Trace(operationName = "mono", resourceName = "mono")
34+
private Mono<String> monoMethod() {
35+
// ...
36+
}
37+
```
38+
39+
Since the tracer runs with the option `-Ddd.trace.annotation.async=true`, it will finish the span when the `Mono`
40+
will complete and not when the method will return.
41+
42+
In this test the context is captured when `block()` is called and every other span opened by the upstream operators
43+
will have it as parent.
44+
45+
The diagram below shows the span propagated onSubscribe (up arrows)
46+
and the span propagated downstream onNext/onError/onComplete (down arrows).
47+
48+
```mermaid
49+
graph TD;
50+
m1[Mono.just<br><em>creates mono</em>]-->|parent|m2;
51+
m2[Mono.map<br><em>creates child</em>]-->|parent|m3[Mono.block];
52+
m3-->|parent|m2;
53+
m2-->|parent|m1;
54+
```
55+
56+
The resulting trace:
57+
58+
![img.png](img.png)
59+
60+
61+
### Top Down context propagation
62+
63+
The context propagation can be changed by advising the span to publish via the reactor `Context`.
64+
The span should be put under the key `dd.span`.
65+
As soon as a publisher emits this span, all the downstream operators will also have that span as active.
66+
It is important to use `ContextWrite` in the right places in the reactive chain for this reason.
67+
68+
Relating to the `testSimpleDownstreamPropagation` test case, the reactive chains will capture `parent` as bottom-up propagation
69+
when `block` is called, but then the propagation is changed when `contextWrite("dd.span", ...)` is called.
70+
71+
The diagram below shows what's happening:
72+
73+
```mermaid
74+
graph TD;
75+
m1[Mono.defer+contextWrite<br><em>creates mono</em>]-->|mono|m2;
76+
m2[Mono.map<br><em>creates child</em>]-->|mono|m3[Mono.block];
77+
m3-->|parent|m2;
78+
m2-->|parent|m1;
79+
```
80+
81+
The resulting trace:
82+
83+
![img_1.png](img_1.png)
84+
85+
### A more complex scenario
86+
87+
`ContextWrite` can be called several times in the chain in order to change the active span that will be propagated.
88+
In fact, generally speaking, when a span is put in the context, it will be propagated by all the upstream publishers
89+
that will have visibility to that reactor context.
90+
91+
Referring to the `testComplexDownstreamPropagation` test case, the propagation is resumed in the following (simplified) diagram:
92+
93+
```mermaid
94+
graph TD;
95+
m1[Mono.flatmap<br><em>creates first</em>]-->|first|m2;
96+
m2[Mono.contextWrite<br><em>set first</em>]-->|first|m3;
97+
m3[Mono.map<br><em>creates child</em>]-->|first|m4;
98+
m4[Mono.flatmap<br><em>creates second</em>]-->|second|m5;
99+
m5[Mono.contextWrite<br><em>set second</em>]-->|second|m6;
100+
m6[Mono.flatmap+contextWrite<br><em>creates third</em>]-->|third|m7;
101+
m7[Mono.flatmap+contextWrite<br><em>creates third</em>]-->|third|m8;
102+
m8[Mono.map<br><em>creates child</em>]-->|third|m9[Mono.block];
103+
m6-->|parent|m5;
104+
m5-->|parent|m4;
105+
m4-->|third|m3;
106+
m3-->|second|m2;
107+
m2-->|second|m1;
108+
m10[start]-->|parent|m1;
109+
```
110+
111+
The graph starts with parent since it's the span captured when `block` is called.
112+
Each flatmap changes the context's active span when `onNext` is signaled
113+
114+
The resulting trace:
115+
![img_2.png](img_2.png)
116+

docs/reactor/img.png

7.67 KB
Loading

docs/reactor/img_1.png

11.8 KB
Loading

docs/reactor/img_2.png

12.8 KB
Loading

docs/reactor/pom.xml

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>dd-trace-java-reactor-examples</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>17</maven.compiler.source>
13+
<maven.compiler.target>17</maven.compiler.target>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
</properties>
16+
<dependencies>
17+
<dependency>
18+
<groupId>io.projectreactor</groupId>
19+
<artifactId>reactor-core</artifactId>
20+
<version>3.6.11</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.junit.jupiter</groupId>
24+
<artifactId>junit-jupiter-engine</artifactId>
25+
<version>5.11.2</version>
26+
<scope>test</scope>
27+
</dependency>
28+
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
29+
<dependency>
30+
<groupId>ch.qos.logback</groupId>
31+
<artifactId>logback-classic</artifactId>
32+
<version>1.5.11</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>io.opentelemetry</groupId>
36+
<artifactId>opentelemetry-api</artifactId>
37+
<version>1.43.0</version>
38+
</dependency>
39+
<dependency>
40+
<groupId>com.datadoghq</groupId>
41+
<artifactId>dd-trace-api</artifactId>
42+
<version>1.42.0</version>
43+
</dependency>
44+
</dependencies>
45+
<build>
46+
<plugins>
47+
<plugin>
48+
<groupId>org.apache.maven.plugins</groupId>
49+
<artifactId>maven-dependency-plugin</artifactId>
50+
<version>3.8.0</version>
51+
<executions>
52+
<execution>
53+
<id>copy-agent</id>
54+
<phase>process-test-classes</phase>
55+
<goals>
56+
<goal>copy</goal>
57+
</goals>
58+
<configuration>
59+
<artifactItems>
60+
<artifactItem>
61+
<groupId>com.datadoghq</groupId>
62+
<artifactId>dd-java-agent</artifactId>
63+
<version>1.42.0</version>
64+
<outputDirectory>${project.build.directory}/agents</outputDirectory>
65+
<destFileName>dd-java-agent.jar</destFileName>
66+
</artifactItem>
67+
</artifactItems>
68+
</configuration>
69+
</execution>
70+
</executions>
71+
</plugin>
72+
<plugin>
73+
<groupId>org.apache.maven.plugins</groupId>
74+
<artifactId>maven-surefire-plugin</artifactId>
75+
<version>2.12.4</version>
76+
<configuration>
77+
<argLine>-javaagent:${project.build.directory}/agents/dd-java-agent.jar -Ddd.trace.otel.enabled=true -Ddd.trace.annotation.async=true</argLine>
78+
</configuration>
79+
</plugin>
80+
</plugins>
81+
</build>
82+
83+
</project>
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package test;
2+
3+
import datadog.trace.api.Trace;
4+
import io.opentelemetry.api.GlobalOpenTelemetry;
5+
import io.opentelemetry.api.trace.Span;
6+
import io.opentelemetry.api.trace.SpanBuilder;
7+
import io.opentelemetry.api.trace.Tracer;
8+
import io.opentelemetry.context.Scope;
9+
import java.time.Duration;
10+
import java.util.Objects;
11+
import java.util.Random;
12+
import org.junit.jupiter.api.Test;
13+
import reactor.core.publisher.Mono;
14+
import reactor.util.context.Context;
15+
16+
public class ReactorTest {
17+
18+
@Trace(operationName = "child", resourceName = "child")
19+
private String doSomeMapping(String s) {
20+
try {
21+
// simulate some work
22+
Thread.sleep(500 + new Random(System.currentTimeMillis()).nextInt(1000));
23+
} catch (InterruptedException ie) {
24+
Thread.currentThread().interrupt();
25+
}
26+
return s;
27+
}
28+
29+
@Trace(operationName = "mono", resourceName = "mono")
30+
private Mono<String> monoMethod() {
31+
// This mono will complete when the delay is expired
32+
return Mono.delay(Duration.ofSeconds(1)).map(ignored -> "Hello World");
33+
}
34+
35+
@Trace(operationName = "mono", resourceName = "mono")
36+
private Mono<String> monoMethodDownstreamPropagate() {
37+
// here the active span is the one created by the @Trace annotation before the method executes
38+
return Mono.just("Hello World").contextWrite(Context.of("dd.span", Span.current()));
39+
}
40+
41+
private <T> Mono<T> tracedMono(
42+
final Tracer tracer, final String spanName, Span parentSpan, final Mono<T> mono) {
43+
SpanBuilder spanBuilder = tracer.spanBuilder(spanName);
44+
if (parentSpan != null) {
45+
spanBuilder.setParent(io.opentelemetry.context.Context.current().with(parentSpan));
46+
}
47+
final Span span = spanBuilder.startSpan();
48+
return mono //
49+
.contextWrite(Context.of("dd.span", span))
50+
.doFinally(ignored -> span.end());
51+
}
52+
53+
@Test
54+
public void testSimpleUpstreamPropagation() {
55+
final Tracer tracer = GlobalOpenTelemetry.getTracer("");
56+
final Span parent = tracer.spanBuilder("parent").startSpan();
57+
assert io.opentelemetry.context.Context.current() == io.opentelemetry.context.Context.root();
58+
try (final Scope parentScope = parent.makeCurrent()) {
59+
// monoMethod will start a trace when called but that span will complete only when the
60+
// returned mono completes.
61+
// doSomeMapping will open a span that's child of parent because it's the active one when we
62+
// subscribe
63+
assert Objects.equals(monoMethod().map(this::doSomeMapping).block(), "Hello World");
64+
} finally {
65+
parent.end();
66+
}
67+
}
68+
69+
@Test
70+
public void testSimpleDownstreamPropagation() {
71+
final Tracer tracer = GlobalOpenTelemetry.getTracer("");
72+
final Span parent = tracer.spanBuilder("parent").startSpan();
73+
assert io.opentelemetry.context.Context.current() == io.opentelemetry.context.Context.root();
74+
try (final Scope parentScope = parent.makeCurrent()) {
75+
// monoMethod will start a trace when called but that span will complete only when the
76+
// returned mono completes.
77+
// doSomeMapping will open a span that's child of parent because it's the active one when we
78+
// subscribe
79+
assert Objects.equals(
80+
Mono.defer(this::monoMethodDownstreamPropagate).map(this::doSomeMapping).block(),
81+
"Hello World");
82+
} finally {
83+
parent.end();
84+
}
85+
}
86+
87+
@Test
88+
public void testComplexDownstreamPropagation() {
89+
final Tracer tracer = GlobalOpenTelemetry.getTracer("");
90+
final Span parent = tracer.spanBuilder("parent").startSpan();
91+
assert io.opentelemetry.context.Context.current() == io.opentelemetry.context.Context.root();
92+
93+
Mono<String> mono =
94+
// here we have no active span. when the mono is emitted we propagate the context captured
95+
// onSubscribe
96+
Mono.just("Hello World") //
97+
// change the downstream propagated span to that new one called 'first'
98+
// first will be child of parent since parent was captured onSubscribe
99+
// (when block is called) and propagated upstream
100+
.flatMap(s -> tracedMono(tracer, "first", null, Mono.just(s + ", GoodBye ")))
101+
// map will use the active one (first) hence the child will be under first
102+
.map(this::doSomeMapping)
103+
// we change again the downstream active span to 'second' that's child of 'first'
104+
.flatMap(
105+
s ->
106+
tracedMono(
107+
tracer, "second", null, Mono.create(sink -> sink.success(s + "World"))))
108+
// now we let the downstream propagate third child of parent
109+
.flatMap(s -> tracedMono(tracer, "third", parent, Mono.just(s + "!")))
110+
// third is the active span downstream
111+
.map(this::doSomeMapping) // will create a child span having third as parent
112+
.doOnNext(System.out::println);
113+
try (final Scope parentScope = parent.makeCurrent()) {
114+
// block, like subscribe will capture the current scope (parent here) and propagate upstream
115+
assert Objects.equals(mono.block(), "Hello World, GoodBye World!");
116+
} finally {
117+
parent.end();
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)