Skip to content

Commit df937df

Browse files
committed
Initial OTel context propagation
1 parent 156c63d commit df937df

File tree

13 files changed

+414
-3
lines changed

13 files changed

+414
-3
lines changed

bosk-opentelemetry/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## bosk-opentelemetry
2+
3+
This is the subproject for the published `bosk-opentelemetry` library,
4+
including support for propagating OpenTelemetry context via bosk diagnostic attributes.

bosk-opentelemetry/build.gradle

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
plugins {
3+
id 'bosk.development'
4+
id 'bosk.maven-publish'
5+
id 'com.github.spotbugs' version '6.1.2'
6+
}
7+
8+
java {
9+
toolchain {
10+
languageVersion = JavaLanguageVersion.of(jdkVersion)
11+
}
12+
}
13+
14+
compileJava {
15+
options.release = prodJavaVersion
16+
}
17+
18+
compileTestJava {
19+
options.release = null
20+
}
21+
22+
repositories {
23+
mavenCentral()
24+
}
25+
26+
dependencies {
27+
api project(":bosk-core")
28+
implementation 'io.opentelemetry:opentelemetry-sdk:1.48.0'
29+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package works.bosk.opentelemetry;
2+
3+
import works.bosk.BoskDiagnosticContext;
4+
import works.bosk.BoskDriver;
5+
import works.bosk.DriverFactory;
6+
import works.bosk.DriverStack;
7+
import works.bosk.StateTreeNode;
8+
import works.bosk.drivers.DiagnosticScopeDriver;
9+
10+
public sealed interface OpenTelemetryDriver extends BoskDriver permits ReceiverDriver {
11+
/**
12+
* @return a {@link DriverFactory} that transmits OpenTelemetry context
13+
* across the given {@code subject} driver
14+
* via the {@link BoskDiagnosticContext diagnostic context}.
15+
*
16+
* @see OpenTelemetryRegistrar
17+
*/
18+
static <RR extends StateTreeNode> DriverFactory<RR> wrapping(DriverFactory<RR> subject) {
19+
return DriverStack.of(
20+
DiagnosticScopeDriver.factory(Utils::diagnosticScopeWithContextFromCurrentSpan),
21+
subject,
22+
ReceiverDriver.factory()
23+
);
24+
}
25+
26+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package works.bosk.opentelemetry;
2+
3+
import works.bosk.BoskDiagnosticContext;
4+
import works.bosk.BoskHook;
5+
import works.bosk.HookRegistrar;
6+
import works.bosk.Reference;
7+
import works.bosk.RegistrarFactory;
8+
9+
/**
10+
* {@link HookRegistrar} that propagates
11+
* OpenTelemetry context from the {@link BoskDiagnosticContext diagnostic context}
12+
* into hooks.
13+
* <p>
14+
* Thread-locals are not automatically propagated into hooks,
15+
* and so OpenTelemetry context must be explicitly propagated.
16+
* This registrar retrieves diagnostic attributes placed there by {@link OpenTelemetryDriver}
17+
* and propagate them into hooks.
18+
*/
19+
public final class OpenTelemetryRegistrar implements HookRegistrar {
20+
final BoskDiagnosticContext diagnosticContext;
21+
final HookRegistrar downstream;
22+
23+
OpenTelemetryRegistrar(BoskDiagnosticContext diagnosticContext, HookRegistrar downstream) {
24+
this.diagnosticContext = diagnosticContext;
25+
this.downstream = downstream;
26+
}
27+
28+
/**
29+
* @return a {@link HookRegistrar} that propagates OpenTelemetry context
30+
* from the {@link BoskDiagnosticContext diagnostic context} into hooks.
31+
*/
32+
public static RegistrarFactory factory() {
33+
return (b,d) -> new OpenTelemetryRegistrar(b.diagnosticContext(), d);
34+
}
35+
36+
@Override
37+
public <T> void registerHook(String name, Reference<T> scope, BoskHook<T> hook) {
38+
downstream.registerHook(name, scope, ref -> {
39+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
40+
hook.onChanged(ref);
41+
}
42+
});
43+
}
44+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package works.bosk.opentelemetry;
2+
3+
import java.io.IOException;
4+
import java.lang.reflect.Type;
5+
import works.bosk.BoskDiagnosticContext;
6+
import works.bosk.BoskDriver;
7+
import works.bosk.DriverFactory;
8+
import works.bosk.Identifier;
9+
import works.bosk.Reference;
10+
import works.bosk.StateTreeNode;
11+
import works.bosk.exceptions.InvalidTypeException;
12+
13+
/**
14+
* Propagates OpenTelemetry context from the {@link BoskDiagnosticContext doagnostic context}
15+
* into the downstream driver.
16+
*/
17+
final class ReceiverDriver implements OpenTelemetryDriver {
18+
19+
private final BoskDiagnosticContext diagnosticContext;
20+
private final BoskDriver downstream;
21+
22+
public static <R extends StateTreeNode> DriverFactory<R> factory() {
23+
return (b, d) -> new ReceiverDriver(b.diagnosticContext(), d);
24+
}
25+
26+
ReceiverDriver(BoskDiagnosticContext diagnosticContext1, BoskDriver downstream) {
27+
this.diagnosticContext = diagnosticContext1;
28+
this.downstream = downstream;
29+
}
30+
31+
@Override
32+
public StateTreeNode initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException {
33+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
34+
return downstream.initialRoot(rootType);
35+
}
36+
}
37+
38+
@Override
39+
public <T> void submitReplacement(Reference<T> target, T newValue) {
40+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
41+
downstream.submitReplacement(target, newValue);
42+
}
43+
}
44+
45+
@Override
46+
public <T> void submitConditionalReplacement(Reference<T> target, T newValue, Reference<Identifier> precondition, Identifier requiredValue) {
47+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
48+
downstream.submitConditionalReplacement(target, newValue, precondition, requiredValue);
49+
}
50+
}
51+
52+
@Override
53+
public <T> void submitConditionalCreation(Reference<T> target, T newValue) {
54+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
55+
downstream.submitConditionalCreation(target, newValue);
56+
}
57+
}
58+
59+
@Override
60+
public <T> void submitDeletion(Reference<T> target) {
61+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
62+
downstream.submitDeletion(target);
63+
}
64+
}
65+
66+
@Override
67+
public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identifier> precondition, Identifier requiredValue) {
68+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
69+
downstream.submitConditionalDeletion(target, precondition, requiredValue);
70+
}
71+
}
72+
73+
@Override
74+
public void flush() throws IOException, InterruptedException {
75+
try (var __ = Utils.contextFromDiagnosticAttributes(diagnosticContext).makeCurrent()) {
76+
downstream.flush();
77+
}
78+
}
79+
80+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package works.bosk.opentelemetry;
2+
3+
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
4+
import io.opentelemetry.context.Context;
5+
import io.opentelemetry.context.propagation.TextMapGetter;
6+
import io.opentelemetry.context.propagation.TextMapSetter;
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
import works.bosk.BoskDiagnosticContext;
10+
import works.bosk.MapValue;
11+
12+
public class Utils {
13+
/**
14+
* Prefix for OpenTelemetry-related diagnostic attributes.
15+
*/
16+
static final String OTEL_PREFIX = "otel.";
17+
18+
/**
19+
* We use the w3c context propagation format.
20+
* Future-proof this putting the attributes under a sub-prefix so we can evolve this later.
21+
*/
22+
static final String W3C_SUB_PREFIX = "w3c.";
23+
24+
/**
25+
* On the setter side, we'll be given plain old OTel keys and need to add the prefixes.
26+
* We add {@link #W3C_SUB_PREFIX} to them here, and then the {@link #OTEL_PREFIX} prefix
27+
* is added by {@link BoskDiagnosticContext#withReplacedPrefix}.
28+
*/
29+
static final TextMapSetter<Map<String, String>> W3C_MAP_SETTER = (carrier, key, value) -> {
30+
if (carrier != null) {
31+
carrier.put(W3C_SUB_PREFIX + key, value);
32+
}
33+
};
34+
35+
/**
36+
* On the getter side, it's simplest to deal with plain old OTel keys.
37+
* Any prefixes are stripped off while we're building the carrier map.
38+
*/
39+
private static final TextMapGetter<Map<String, String>> MAP_GETTER = new TextMapGetter<>() {
40+
@Override
41+
public Iterable<String> keys(Map<String, String> carrier) {
42+
return carrier.keySet();
43+
}
44+
45+
@Override
46+
public String get(Map<String, String> carrier, String key) {
47+
if (carrier == null) {
48+
return null;
49+
} else {
50+
return carrier.get(key);
51+
}
52+
}
53+
};
54+
55+
static Context contextFromDiagnosticAttributes(BoskDiagnosticContext diagnosticContext) {
56+
Map<String, String> otelAttributes = new HashMap<>();
57+
diagnosticContext.getAttributes().forEach((k, v) -> {
58+
if (k.startsWith(OTEL_PREFIX)) {
59+
if (k.startsWith(OTEL_PREFIX + W3C_SUB_PREFIX)) {
60+
otelAttributes.put(k.substring(OTEL_PREFIX.length() + W3C_SUB_PREFIX.length()), v);
61+
} else {
62+
throw new IllegalStateException("Only w3c context propagation is supported; unexpected OpenTelemetry key from diagnostic context: " + k);
63+
}
64+
}
65+
});
66+
return W3CTraceContextPropagator.getInstance()
67+
.extract(Context.current(), otelAttributes, MAP_GETTER);
68+
}
69+
70+
/**
71+
* Stashes OpenTelemetry context into the {@link BoskDiagnosticContext doagnostic context},
72+
* where it can be retrieved by
73+
* {@link ReceiverDriver} to propagate the context downstream,
74+
* and by {@link OpenTelemetryRegistrar} to propagate the context into hooks.
75+
*/
76+
static BoskDiagnosticContext.DiagnosticScope diagnosticScopeWithContextFromCurrentSpan(BoskDiagnosticContext dc) {
77+
return dc.withReplacedPrefix(OTEL_PREFIX, w3cContextFromCurrentSpan());
78+
}
79+
80+
static MapValue<String> w3cContextFromCurrentSpan() {
81+
Map<String, String> result = new HashMap<>();
82+
W3CTraceContextPropagator.getInstance()
83+
.inject(Context.current(), result, W3C_MAP_SETTER);
84+
return MapValue.copyOf(result);
85+
}
86+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/**
2+
* Provides OpenTelemetry context propagation for Bosk.
3+
* <p>
4+
* To use this, configure your bosk instance to do the following two things:
5+
* <ol>
6+
* <li>
7+
* Use {@link works.bosk.opentelemetry.OpenTelemetryDriver#wrapping} on any driver
8+
* that does not implicitly propagate thread context by calling its downstream driver
9+
* synchronously on the same thread.
10+
* </li>
11+
* <li>
12+
* Use {@link works.bosk.opentelemetry.OpenTelemetryRegistrar#factory()}.
13+
* </li>
14+
* </ol>
15+
*/
16+
package works.bosk.opentelemetry;
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package works.bosk.opentelemetry;
2+
3+
import io.opentelemetry.api.trace.Span;
4+
import io.opentelemetry.api.trace.TraceId;
5+
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
6+
import io.opentelemetry.context.Context;
7+
import io.opentelemetry.context.propagation.ContextPropagators;
8+
import io.opentelemetry.sdk.OpenTelemetrySdk;
9+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
10+
import java.io.IOException;
11+
import java.util.concurrent.BlockingQueue;
12+
import java.util.concurrent.LinkedBlockingQueue;
13+
import org.junit.jupiter.api.AfterAll;
14+
import org.junit.jupiter.api.BeforeAll;
15+
import org.junit.jupiter.api.Test;
16+
import works.bosk.Bosk;
17+
import works.bosk.DriverStack;
18+
import works.bosk.StateTreeNode;
19+
import works.bosk.drivers.BufferingDriver;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
23+
class OpenTelemetryDriverTest {
24+
25+
private static OpenTelemetrySdk openTelemetry;
26+
27+
public record Root(Integer revision) implements StateTreeNode {}
28+
29+
@BeforeAll
30+
static void setup() {
31+
openTelemetry = OpenTelemetrySdk.builder()
32+
.setTracerProvider(SdkTracerProvider.builder().build())
33+
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
34+
.buildAndRegisterGlobal();
35+
}
36+
37+
@AfterAll
38+
static void teardown() {
39+
openTelemetry.close();
40+
}
41+
42+
@Test
43+
void wrapping_propagatesTraceId() throws InterruptedException, IOException {
44+
var bosk = new Bosk<Root>(
45+
"test-bosk",
46+
Root.class,
47+
_ -> new Root(0),
48+
DriverStack.of(
49+
OpenTelemetryDriver.wrapping(
50+
// Use a driver that does not call its downstream driver synchronously on the same thread
51+
// so that the OpenTelemetry thread context is not propagated implicitly.
52+
// Otherwise, this isn't much of a test.
53+
BufferingDriver.factory()
54+
)
55+
),
56+
OpenTelemetryRegistrar.factory());
57+
record Observation(int revision, String traceID) { }
58+
BlockingQueue<Observation> observations = new LinkedBlockingQueue<>();
59+
bosk.registerHook("attribute observer", bosk.rootReference(), ref -> {
60+
observations.add(new Observation(ref.value().revision(),
61+
Span.current().getSpanContext().getTraceId()));
62+
});
63+
64+
// There's an initial observation from the time the hook was registered.
65+
// At that time, there was no active trace
66+
assertEquals(new Observation(0, TraceId.getInvalid()), observations.take());
67+
68+
// Submit a change with an active OpenTelemetry span and observe the attributes
69+
Observation expected;
70+
var span = openTelemetry.getTracer("test-scope")
71+
.spanBuilder("test-span")
72+
.setParent(Context.current())
73+
.startSpan();
74+
try (var _ = span.makeCurrent()) {
75+
bosk.driver().submitReplacement(bosk.rootReference(), new Root(1));
76+
expected = new Observation(1, Span.current().getSpanContext().getTraceId());
77+
} finally {
78+
span.end();
79+
}
80+
81+
bosk.driver().flush();
82+
// Check that the trace context was propagated all the way into the hook
83+
assertEquals(expected, observations.take());
84+
assert observations.isEmpty();
85+
}
86+
87+
}

0 commit comments

Comments
 (0)