Skip to content

Commit 20f4ddd

Browse files
committed
Add EventToSpanEventBridge
1 parent 0b7233e commit 20f4ddd

File tree

8 files changed

+401
-2
lines changed

8 files changed

+401
-2
lines changed

processors/README.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,34 @@
11
# Processors
22

3-
This module provides tools to intercept and process signals globally.
3+
## Interceptable exporters
4+
5+
This module provides tools to intercept and process signals globally:
6+
7+
* `InterceptableSpanExporter`
8+
* `InterceptableMetricExporter`
9+
* `InterceptableLogRecordExporter`
10+
11+
## Event to SpanEvent Bridge
12+
13+
`EventToSpanEventBridge` is a `LogRecordProcessor` which records events (i.e. log records with an `event.name` attribute) as span events for the current span if:
14+
15+
* The log record has a valid span context
16+
* `Span.current()` returns a span where `Span.isRecording()` is true
17+
18+
For details of how the event log record is translated to span event, see [EventToSpanEventBridge Javadoc](./src/main/java/io/opentelemetry/contrib/eventbridge/EventToSpanEventBridge.java).
19+
20+
`EventToSpanEventBridge` can be referenced in [declarative configuration](https://opentelemetry.io/docs/languages/java/configuration/#declarative-configuration) as follows:
21+
22+
```yaml
23+
// Configure tracer provider as usual, omitted for brevity
24+
tracer_provider: ...
25+
26+
logger_provider:
27+
processors:
28+
- event_to_span_event_bridge: {}
29+
```
30+
31+
// TODO(jack-berg): remove "{}" after merging / rle https://github.com/open-telemetry/opentelemetry-java/pull/6891/files
432
533
## Component owners
634

processors/build.gradle.kts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,13 @@ java {
1313

1414
dependencies {
1515
api("io.opentelemetry:opentelemetry-sdk")
16+
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi")
17+
18+
// For EventToSpanEventBridge
19+
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
20+
implementation("com.fasterxml.jackson.core:jackson-core")
21+
1622
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
23+
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
24+
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
1725
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.eventbridge;
7+
8+
import io.opentelemetry.api.common.AttributeKey;
9+
import io.opentelemetry.api.common.Attributes;
10+
import io.opentelemetry.api.common.AttributesBuilder;
11+
import io.opentelemetry.api.common.Value;
12+
import io.opentelemetry.api.trace.Span;
13+
import io.opentelemetry.context.Context;
14+
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
15+
import io.opentelemetry.exporter.internal.otlp.AnyValueMarshaler;
16+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
17+
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
18+
import io.opentelemetry.sdk.logs.data.LogRecordData;
19+
import java.io.ByteArrayOutputStream;
20+
import java.io.IOException;
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.logging.Level;
24+
import java.util.logging.Logger;
25+
26+
/**
27+
* A processor that records events (i.e. log records with an {@code event.name} attribute) as span
28+
* events for the current span if:
29+
*
30+
* <ul>
31+
* <li>The log record has a valid span context
32+
* <li>{@link Span#current()} returns a span where {@link Span#isRecording()} is true
33+
* </ul>
34+
*
35+
* <p>The event {@link LogRecordData} is converted to attributes on the span event as follows:
36+
*
37+
* <ul>
38+
* <li>{@code event.name} attribute is mapped to span event name
39+
* <li>{@link LogRecordData#getTimestampEpochNanos()} is mapped to span event timestamp
40+
* <li>{@link LogRecordData#getAttributes()} are mapped to span event attributes, excluding {@code
41+
* event.name}
42+
* <li>{@link LogRecordData#getObservedTimestampEpochNanos()} is mapped to span event attribute
43+
* with key {@code log.record.observed_timestamp}
44+
* <li>{@link LogRecordData#getSeverity()} is mapped to span event attribute with key {@code
45+
* log.record.severity_number}
46+
* <li>{@link LogRecordData#getBodyValue()} is mapped to span event attribute with key {@code
47+
* log.record.body}, as an escaped JSON string following the standard protobuf JSON encoding
48+
* <li>{@link LogRecordData#getTotalAttributeCount()} - {@link
49+
* LogRecordData#getAttributes()}.size() is mapped to span event attribute with key {@code
50+
* log.record.dropped_attributes_count}
51+
* </ul>
52+
*/
53+
public final class EventToSpanEventBridge implements LogRecordProcessor {
54+
55+
private static final Logger LOGGER = Logger.getLogger(EventToSpanEventBridge.class.getName());
56+
57+
private static final AttributeKey<String> EVENT_NAME = AttributeKey.stringKey("event.name");
58+
private static final AttributeKey<Long> LOG_RECORD_OBSERVED_TIME_UNIX_NANO =
59+
AttributeKey.longKey("log.record.observed_time_unix_nano");
60+
private static final AttributeKey<Long> LOG_RECORD_SEVERITY_NUMBER =
61+
AttributeKey.longKey("log.record.severity_number");
62+
private static final AttributeKey<String> LOG_RECORD_BODY =
63+
AttributeKey.stringKey("log.record.body");
64+
private static final AttributeKey<Long> LOG_RECORD_DROPPED_ATTRIBUTES_COUNT =
65+
AttributeKey.longKey("log.record.dropped_attributes_count");
66+
67+
private EventToSpanEventBridge() {}
68+
69+
/** Create an instance. */
70+
public static EventToSpanEventBridge create() {
71+
return new EventToSpanEventBridge();
72+
}
73+
74+
@Override
75+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
76+
LogRecordData logRecordData = logRecord.toLogRecordData();
77+
String eventName = logRecordData.getAttributes().get(EVENT_NAME);
78+
if (eventName == null) {
79+
return;
80+
}
81+
if (!logRecordData.getSpanContext().isValid()) {
82+
return;
83+
}
84+
Span currentSpan = Span.current();
85+
if (!currentSpan.isRecording()) {
86+
return;
87+
}
88+
currentSpan.addEvent(
89+
eventName,
90+
toSpanEventAttributes(logRecordData),
91+
logRecordData.getTimestampEpochNanos(),
92+
TimeUnit.NANOSECONDS);
93+
}
94+
95+
private static Attributes toSpanEventAttributes(LogRecordData logRecord) {
96+
AttributesBuilder builder =
97+
logRecord.getAttributes().toBuilder().removeIf(key -> key.equals(EVENT_NAME));
98+
99+
builder.put(LOG_RECORD_OBSERVED_TIME_UNIX_NANO, logRecord.getObservedTimestampEpochNanos());
100+
101+
builder.put(LOG_RECORD_SEVERITY_NUMBER, logRecord.getSeverity().getSeverityNumber());
102+
103+
// Add bridging for logRecord.getSeverityText() if EventBuilder adds severity text setter
104+
105+
Value<?> body = logRecord.getBodyValue();
106+
if (body != null) {
107+
MarshalerWithSize marshaler = AnyValueMarshaler.create(body);
108+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
109+
try {
110+
marshaler.writeJsonTo(baos);
111+
} catch (IOException e) {
112+
LOGGER.log(Level.WARNING, "Error converting log record body to JSON", e);
113+
}
114+
builder.put(LOG_RECORD_BODY, new String(baos.toByteArray(), StandardCharsets.UTF_8));
115+
}
116+
117+
int droppedAttributesCount =
118+
logRecord.getTotalAttributeCount() - logRecord.getAttributes().size();
119+
if (droppedAttributesCount > 0) {
120+
builder.put(LOG_RECORD_DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
121+
}
122+
123+
return builder.build();
124+
}
125+
126+
@Override
127+
public String toString() {
128+
return "EventToSpanEventBridge{}";
129+
}
130+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.eventbridge.internal;
7+
8+
import io.opentelemetry.contrib.eventbridge.EventToSpanEventBridge;
9+
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
10+
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
11+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
12+
13+
/**
14+
* Declarative configuration SPI implementation for {@link EventToSpanEventBridge}.
15+
*
16+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
17+
* at any time.
18+
*/
19+
public class EventToSpanEventBridgeComponentProvider
20+
implements ComponentProvider<LogRecordProcessor> {
21+
22+
@Override
23+
public Class<LogRecordProcessor> getType() {
24+
return LogRecordProcessor.class;
25+
}
26+
27+
@Override
28+
public String getName() {
29+
return "event_to_span_event_bridge";
30+
}
31+
32+
@Override
33+
public LogRecordProcessor create(StructuredConfigProperties config) {
34+
return EventToSpanEventBridge.create();
35+
}
36+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
io.opentelemetry.contrib.eventbridge.internal.EventToSpanEventBridgeComponentProvider
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.eventbridge;
7+
8+
import static io.opentelemetry.api.common.AttributeKey.longKey;
9+
import static io.opentelemetry.api.common.AttributeKey.stringKey;
10+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
11+
12+
import io.opentelemetry.api.common.Attributes;
13+
import io.opentelemetry.api.common.Value;
14+
import io.opentelemetry.api.incubator.events.EventLogger;
15+
import io.opentelemetry.api.logs.Severity;
16+
import io.opentelemetry.api.trace.Span;
17+
import io.opentelemetry.api.trace.SpanKind;
18+
import io.opentelemetry.api.trace.Tracer;
19+
import io.opentelemetry.context.Context;
20+
import io.opentelemetry.context.Scope;
21+
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
22+
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider;
23+
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
24+
import io.opentelemetry.sdk.testing.time.TestClock;
25+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
26+
import io.opentelemetry.sdk.trace.data.LinkData;
27+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
28+
import io.opentelemetry.sdk.trace.samplers.Sampler;
29+
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
30+
import java.time.Instant;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.concurrent.TimeUnit;
34+
import org.junit.jupiter.api.Test;
35+
36+
class EventToSpanEventBridgeTest {
37+
38+
private final InMemorySpanExporter spanExporter = InMemorySpanExporter.create();
39+
private final SdkTracerProvider tracerProvider =
40+
SdkTracerProvider.builder()
41+
.setSampler(onlyServerSpans())
42+
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
43+
.build();
44+
private final TestClock testClock = TestClock.create();
45+
private final SdkEventLoggerProvider eventLoggerProvider =
46+
SdkEventLoggerProvider.create(
47+
SdkLoggerProvider.builder()
48+
.setClock(testClock)
49+
.addLogRecordProcessor(EventToSpanEventBridge.create())
50+
.build());
51+
private final Tracer tracer = tracerProvider.get("tracer");
52+
private final EventLogger eventLogger = eventLoggerProvider.get("event-logger");
53+
54+
private static Sampler onlyServerSpans() {
55+
return new Sampler() {
56+
@Override
57+
public SamplingResult shouldSample(
58+
Context parentContext,
59+
String traceId,
60+
String name,
61+
SpanKind spanKind,
62+
Attributes attributes,
63+
List<LinkData> parentLinks) {
64+
return SpanKind.SERVER.equals(spanKind)
65+
? SamplingResult.recordAndSample()
66+
: SamplingResult.drop();
67+
}
68+
69+
@Override
70+
public String getDescription() {
71+
return "description";
72+
}
73+
};
74+
}
75+
76+
@Test
77+
void withRecordingSpan_BridgesEvent() {
78+
testClock.setTime(Instant.ofEpochMilli(1));
79+
80+
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.SERVER).startSpan();
81+
try (Scope unused = span.makeCurrent()) {
82+
eventLogger
83+
.builder("my.event-name")
84+
.setTimestamp(100, TimeUnit.NANOSECONDS)
85+
.setSeverity(Severity.DEBUG)
86+
.put("foo", "bar")
87+
.put("number", 1)
88+
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
89+
.setAttributes(Attributes.builder().put("color", "red").build())
90+
.setAttributes(Attributes.builder().put("shape", "square").build())
91+
.emit();
92+
} finally {
93+
span.end();
94+
}
95+
96+
assertThat(spanExporter.getFinishedSpanItems())
97+
.satisfiesExactly(
98+
spanData ->
99+
assertThat(spanData)
100+
.hasName("span")
101+
.hasEventsSatisfyingExactly(
102+
spanEvent ->
103+
spanEvent
104+
.hasName("my.event-name")
105+
.hasTimestamp(100, TimeUnit.NANOSECONDS)
106+
.hasAttributesSatisfying(
107+
attributes -> {
108+
assertThat(attributes.get(stringKey("color")))
109+
.isEqualTo("red");
110+
assertThat(attributes.get(stringKey("shape")))
111+
.isEqualTo("square");
112+
assertThat(
113+
attributes.get(
114+
longKey("log.record.observed_time_unix_nano")))
115+
.isEqualTo(1000000L);
116+
assertThat(
117+
attributes.get(longKey("log.record.severity_number")))
118+
.isEqualTo(Severity.DEBUG.getSeverityNumber());
119+
assertThat(attributes.get(stringKey("log.record.body")))
120+
.isEqualTo(
121+
"{\"kvlistValue\":{\"values\":[{\"key\":\"number\",\"value\":{\"intValue\":\"1\"}},{\"key\":\"foo\",\"value\":{\"stringValue\":\"bar\"}},{\"key\":\"map\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"key\",\"value\":{\"stringValue\":\"value\"}}]}}}]}}");
122+
})));
123+
}
124+
125+
@Test
126+
void noSpan_doesNotBridgeEvent() {
127+
eventLogger
128+
.builder("my.event-name")
129+
.setTimestamp(100, TimeUnit.NANOSECONDS)
130+
.setSeverity(Severity.DEBUG)
131+
.put("foo", "bar")
132+
.put("number", 1)
133+
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
134+
.setAttributes(Attributes.builder().put("color", "red").build())
135+
.setAttributes(Attributes.builder().put("shape", "square").build())
136+
.emit();
137+
138+
assertThat(spanExporter.getFinishedSpanItems()).isEmpty();
139+
}
140+
141+
@Test
142+
void nonRecordingSpan_doesNotBridgeEvent() {
143+
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.INTERNAL).startSpan();
144+
try (Scope unused = span.makeCurrent()) {
145+
eventLogger
146+
.builder("my.event-name")
147+
.setTimestamp(100, TimeUnit.NANOSECONDS)
148+
.setSeverity(Severity.DEBUG)
149+
.put("foo", "bar")
150+
.put("number", 1)
151+
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
152+
.setAttributes(Attributes.builder().put("color", "red").build())
153+
.setAttributes(Attributes.builder().put("shape", "square").build())
154+
.emit();
155+
} finally {
156+
span.end();
157+
}
158+
159+
assertThat(spanExporter.getFinishedSpanItems()).isEmpty();
160+
}
161+
}

0 commit comments

Comments
 (0)