Skip to content

Commit 99f79af

Browse files
committed
[#13477] Update opentelemetry otlp trace span chain
1 parent e151c5d commit 99f79af

File tree

31 files changed

+745
-548
lines changed

31 files changed

+745
-548
lines changed
Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.navercorp.pinpoint.common.server.bo;
22

3-
import com.navercorp.pinpoint.common.trace.AnnotationKey;
4-
53
import java.util.Comparator;
64

75
/**
@@ -10,12 +8,4 @@
108
public final class SpanEventComparator {
119

1210
public static final Comparator<SpanEventBo> INSTANCE = Comparator.comparingInt(SpanEventBo::getSequence);
13-
public static final Comparator<SpanEventBo> ANNOTATION_START_TIME = Comparator.comparingLong(spanEventBo -> {
14-
for(AnnotationBo annotationBo : spanEventBo.getAnnotationBoList()) {
15-
if(AnnotationKey.OPENTELEMETRY_START_TIME.getCode() == annotationBo.getKey()) {
16-
return (long) annotationBo.getValue();
17-
}
18-
}
19-
return 0;
20-
});
2111
}

commons/src/main/java/com/navercorp/pinpoint/common/trace/AnnotationKey.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,6 @@ public interface AnnotationKey {
257257
AnnotationKey OPENTELEMETRY_START_TIME = AnnotationKeyFactory.of(401, "StartTimeUnixNano");
258258
AnnotationKey OPENTELEMETRY_EVENT = AnnotationKeyFactory.of(402, "Event", VIEW_IN_RECORD_SET);
259259
AnnotationKey OPENTELEMETRY_LINK = AnnotationKeyFactory.of(403, "Link", VIEW_IN_RECORD_SET);
260+
AnnotationKey OPENTELEMETRY_SPAN_ID = AnnotationKeyFactory.of(404, "SpanId");
261+
AnnotationKey OPENTELEMETRY_PARENT_SPAN_ID = AnnotationKeyFactory.of(405, "ParentSpanId");
260262
}

commons/src/main/java/com/navercorp/pinpoint/common/trace/ServiceType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,6 @@ public interface ServiceType {
411411
ServiceType OPENTELEMETRY_SERVER = of(1220, "OPENTELEMETRY_SERVER", "OPENTELEMETRY_SERVER", RECORD_STATISTICS);
412412
ServiceType OPENTELEMETRY_INTERNAL = of(1221, "OPENTELEMETRY_INTERNAL", "OPENTELEMETRY_INTERNAL");
413413
ServiceType OPENTELEMETRY_DB = of(2060, "OPENTELEMETRY_DB", "OPENTELEMETRY_DB", RECORD_STATISTICS);
414-
ServiceType OPENTELEMETRY_DB_EXECUTE_QUERY = of(2061, "OPENTELEMETRY_DB_EXECUTE_QUERY", "OPENTELEMETRY_DB", TERMINAL, RECORD_STATISTICS, INCLUDE_DESTINATION_ID);
414+
ServiceType OPENTELEMETRY_DB_EXECUTE_QUERY = of(2061, "OPENTELEMETRY_DB_EXECUTE_QUERY", "OPENTELEMETRY_DB_EXECUTE_QUERY", TERMINAL, RECORD_STATISTICS, INCLUDE_DESTINATION_ID);
415415
ServiceType OPENTELEMETRY_CLIENT = of(9310, "OPENTELEMETRY_CLIENT", "OPENTELEMETRY_CLIENT", RECORD_STATISTICS);
416416
}

otlptrace/otlptrace-collector/src/main/java/com/navercorp/pinpoint/otlp/trace/collector/OtlpTraceCollectorModule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ public ObjectNameVersion serverNameVersion(@Value(ObjectNameVersion.VALUE_KEY) S
6868
}
6969

7070
@Bean
71-
public ServerServiceDefinition serverServiceDefinition(@Qualifier("hbaseOtlpTraceService") TraceService traceService, @Qualifier("hbaseOtlpAgentInfoService") HbaseOtlpAgentInfoService agentInfoService, @Qualifier("hbaseOtlpApplicationIndexV2Service") HbaseOtlpApplicationIndexV2Service applicationIndexV2Service, OtlpTraceMapper mapper) {
72-
BindableService spanService = new GrpcOtlpTraceService(traceService, agentInfoService, applicationIndexV2Service, mapper);
71+
public ServerServiceDefinition serverServiceDefinition(TraceService traceServiceList, @Qualifier("hbaseOtlpAgentInfoService") HbaseOtlpAgentInfoService agentInfoService, @Qualifier("hbaseOtlpApplicationIndexV2Service") HbaseOtlpApplicationIndexV2Service applicationIndexV2Service, OtlpTraceMapper mapper) {
72+
BindableService spanService = new GrpcOtlpTraceService(traceServiceList, agentInfoService, applicationIndexV2Service, mapper);
7373
return ServerInterceptors.intercept(spanService);
7474
}
7575

otlptrace/otlptrace-collector/src/main/java/com/navercorp/pinpoint/otlp/trace/collector/mapper/OtlpTraceConstants.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,30 @@
2121

2222
public class OtlpTraceConstants {
2323
public static final String ATTRIBUTE_KEY_CLIENT_ADDRESS = "client.address";
24+
public static final String ATTRIBUTE_KEY_PEER_ADDRESS = "peer.address";
25+
public static final String ATTRIBUTE_KEY_NET_PEER_IP = "net.peer.ip";
26+
public static final String ATTRIBUTE_KEY_NETWORK_PEER_IP = "network.peer.address";
27+
public static final String ATTRIBUTE_KEY_NETWORK_PEER_PORT = "network.peer.port";
2428
public static final String ATTRIBUTE_KEY_HTTP_RESPONSE_STATUS_CODE = "http.response.status_code";
29+
public static final String ATTRIBUTE_KEY_HTTP_STATUS_CODE = "http.status_code";
2530
public static final String ATTRIBUTE_KEY_MESSAGING_KAFKA_MESSAGE_OFFSET = "messaging.kafka.message.offset";
2631
public static final String ATTRIBUTE_KEY_MESSAGING_DESTINATION_PARTITION_ID = "messaging.destination.partition.id";
2732
public static final String ATTRIBUTE_KEY_MESSAGING_DESTINATION_NAME = "messaging.destination.name";
2833
public static final String ATTRIBUTE_KEY_URL_PATH = "url.path";
34+
public static final String ATTRIBUTE_KEY_HTTP_URL = "http.url";
35+
public static final String ATTRIBUTE_KEY_HTTP_TARGET = "http.target";
36+
public static final String ATTRIBUTE_KEY_RPC_SERVICE = "rpc.service";
37+
public static final String ATTRIBUTE_KEY_RPC_METHOD = "rpc.method";
2938
public static final String ATTRIBUTE_KEY_MESSAGING_CLIENT_ID = "messaging.client_id";
3039
public static final String ATTRIBUTE_KEY_SERVER_PORT = "server.port";
3140
public static final String ATTRIBUTE_KEY_SERVER_ADDRESS = "server.address";
41+
public static final String ATTRIBUTE_KEY_UPSTREAM_ADDRESS = "upstream_address";
3242
public static final String ATTRIBUTE_KEY_DB_NAME = "db.name";
43+
public static final String ATTRIBUTE_KEY_UPSTREAM_CLUSTER_NAME = "upstream_cluster.name";
3344
public static final String ATTRIBUTE_KEY_DB_STATEMENT = "db.statement";
45+
public static final String ATTRIBUTE_KEY_DB_QUERY_TEXT = "db.query.text";
3446
public static final String ATTRIBUTE_KEY_DB_SYSTEM = "db.system";
47+
public static final String ATTRIBUTE_KEY_DB_SYSTEM_NAME = "db.system.name";
3548

3649
public static final Map<String, Boolean> FILTERED_ATTRIBUTE_KEY_MAP = Map.ofEntries(
3750
Map.entry(ATTRIBUTE_KEY_CLIENT_ADDRESS, Boolean.TRUE),

otlptrace/otlptrace-collector/src/main/java/com/navercorp/pinpoint/otlp/trace/collector/mapper/OtlpTraceMapper.java

Lines changed: 139 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package com.navercorp.pinpoint.otlp.trace.collector.mapper;
1818

19+
import com.google.protobuf.ByteString;
1920
import com.navercorp.pinpoint.common.server.bo.AgentInfoBo;
2021
import com.navercorp.pinpoint.common.server.bo.SpanBo;
2122
import com.navercorp.pinpoint.common.server.bo.SpanChunkBo;
23+
import com.navercorp.pinpoint.common.server.bo.SpanEventBo;
2224
import com.navercorp.pinpoint.otlp.trace.collector.OtlpTraceCollectorRejectedSpan;
2325
import io.opentelemetry.proto.common.v1.KeyValue;
2426
import io.opentelemetry.proto.trace.v1.ResourceSpans;
@@ -28,7 +30,13 @@
2830
import org.apache.logging.log4j.Logger;
2931
import org.springframework.stereotype.Component;
3032

33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.HashSet;
3136
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Set;
39+
import java.util.concurrent.TimeUnit;
3240

3341
@Component
3442
public class OtlpTraceMapper {
@@ -42,17 +50,23 @@ public class OtlpTraceMapper {
4250
public static final String LINK_METHOD_NAME = "Link";
4351

4452
private final OtlpTraceSpanMapper spanMapper;
53+
private final OtlpTraceSpanEventMapper spanEventMapper;
4554
private final OtlpTraceSpanChunkMapper spanChunkMapper;
4655
private final OtlpAgentInfoMapper agentInfoMapper;
4756

48-
public OtlpTraceMapper(OtlpTraceSpanMapper spanMapper, OtlpTraceSpanChunkMapper spanChunkMapper, OtlpAgentInfoMapper agentInfoMapper) {
57+
public OtlpTraceMapper(OtlpTraceSpanMapper spanMapper, OtlpTraceSpanEventMapper spanEventMapper, OtlpTraceSpanChunkMapper spanChunkMapper, OtlpAgentInfoMapper agentInfoMapper) {
4958
this.spanMapper = spanMapper;
59+
this.spanEventMapper = spanEventMapper;
5060
this.spanChunkMapper = spanChunkMapper;
5161
this.agentInfoMapper = agentInfoMapper;
5262
}
5363

64+
// sort by traceId
65+
// find root span, server type, no parentSpanId
66+
// link span, find parentSpanId
5467
public OtlpTraceMapperData map(List<ResourceSpans> resourceSpanList) {
5568
final OtlpTraceMapperData mapperData = new OtlpTraceMapperData();
69+
int errorCount = 0;
5670
for (ResourceSpans resourceSpan : resourceSpanList) {
5771
final IdAndName idAndName = getId(mapperData, resourceSpan);
5872
if (idAndName == null) {
@@ -62,45 +76,53 @@ public OtlpTraceMapperData map(List<ResourceSpans> resourceSpanList) {
6276

6377
final List<KeyValue> attributesList = resourceSpan.getResource().getAttributesList();
6478
final List<ScopeSpans> scopeSpanList = resourceSpan.getScopeSpansList();
65-
for (ScopeSpans scopeSpan : scopeSpanList) {
66-
List<Span> spansList = scopeSpan.getSpansList();
67-
int errorCount = 0;
68-
for (Span span : spansList) {
79+
final Map<ByteString, List<Span>> spanMap = getSpanMap(scopeSpanList);
80+
81+
// find root span, server type, no parentSpanId
82+
for (Map.Entry<ByteString, List<Span>> entry : spanMap.entrySet()) {
83+
List<Span> rootSpanList = new ArrayList<>();
84+
List<Span> childSpanList = new ArrayList<>();
85+
initRootAndChild(entry.getValue(), rootSpanList, childSpanList);
86+
87+
for (Span rootSpan : rootSpanList) {
88+
try {
89+
final SpanBo spanBo = spanMapper.map(idAndName, rootSpan);
90+
final List<SpanEventBo> spanEventList = findLinkSpan(spanBo.getStartTime(), childSpanList, rootSpan.getSpanId(), 1);
91+
spanBo.addSpanEventBoList(spanEventList);
92+
mapperData.addSpanBo(spanBo);
93+
final AgentInfoBo agentInfoBo = agentInfoMapper.map(spanBo, attributesList);
94+
mapperData.addAgentInfoBo(agentInfoBo);
95+
} catch (Exception e) {
96+
errorCount++;
97+
logger.warn("Failed to map span", e);
98+
}
99+
}
100+
101+
// Build trees from remaining child spans (orphan sub-traces)
102+
if (!childSpanList.isEmpty()) {
69103
try {
70-
switch (span.getKind().getNumber()) {
71-
case Span.SpanKind.SPAN_KIND_SERVER_VALUE,
72-
Span.SpanKind.SPAN_KIND_CONSUMER_VALUE -> {
73-
final SpanBo spanBo = spanMapper.map(idAndName, span);
74-
mapperData.addSpanBo(spanBo);
75-
final AgentInfoBo agentInfoBo = agentInfoMapper.map(spanBo, attributesList);
76-
mapperData.addAgentInfoBo(agentInfoBo);
77-
}
78-
case Span.SpanKind.SPAN_KIND_CLIENT_VALUE -> {
79-
final SpanChunkBo spanChunkBo = spanChunkMapper.map(idAndName, span);
80-
mapperData.addSpanChunkBo(spanChunkBo);
81-
}
82-
case Span.SpanKind.SPAN_KIND_PRODUCER_VALUE -> {
83-
final SpanChunkBo spanChunkBo = spanChunkMapper.map(idAndName, span);
84-
mapperData.addSpanChunkBo(spanChunkBo);
85-
}
86-
default -> {
87-
final SpanChunkBo spanChunkBo = spanChunkMapper.map(idAndName, span);
88-
mapperData.addSpanChunkBo(spanChunkBo);
89-
}
104+
List<SpanChunkBo> spanChunkBoList = findLinkSpanChunk(idAndName, childSpanList);
105+
for (SpanChunkBo spanChunkBo : spanChunkBoList) {
106+
mapperData.addSpanChunkBo(spanChunkBo);
90107
}
91108
} catch (Exception e) {
92109
errorCount++;
93-
logger.warn("Failed to map", e);
110+
logger.warn("Failed to map spanChunk", e);
94111
}
95112
}
96-
if (errorCount > 0) {
97-
OtlpTraceCollectorRejectedSpan rejectedSpan = mapperData.getRejectedSpan();
98-
rejectedSpan.putMessage("mapping error (" + errorCount + ")");
99-
rejectedSpan.addCount(errorCount);
113+
114+
if (!childSpanList.isEmpty()) {
115+
logger.warn("Unknown spans={}", childSpanList);
100116
}
101117
}
102118
}
103119

120+
121+
if (errorCount > 0) {
122+
OtlpTraceCollectorRejectedSpan rejectedSpan = mapperData.getRejectedSpan();
123+
rejectedSpan.putMessage("mapping error (" + errorCount + ")");
124+
rejectedSpan.addCount(errorCount);
125+
}
104126
return mapperData;
105127
}
106128

@@ -116,4 +138,91 @@ IdAndName getId(OtlpTraceMapperData mapperData, ResourceSpans resourceSpan) {
116138
return null;
117139
}
118140
}
141+
142+
Map<ByteString, List<Span>> getSpanMap(List<ScopeSpans> scopeSpanList) {
143+
Map<ByteString, List<Span>> spanMap = new HashMap<>();
144+
for (ScopeSpans scopeSpan : scopeSpanList) {
145+
List<Span> spansList = scopeSpan.getSpansList();
146+
for (Span span : spansList) {
147+
spanMap.computeIfAbsent(span.getTraceId(), k -> new ArrayList<>()).add(span);
148+
}
149+
}
150+
return spanMap;
151+
}
152+
153+
void initRootAndChild(List<Span> spanList, List<Span> rootSpanList, List<Span> childSpanList) {
154+
for (Span span : spanList) {
155+
if (span.getKind().getNumber() == Span.SpanKind.SPAN_KIND_SERVER_VALUE) {
156+
rootSpanList.add(span);
157+
} else if (span.getKind().getNumber() == Span.SpanKind.SPAN_KIND_CONSUMER_VALUE) {
158+
rootSpanList.add(span);
159+
} else {
160+
// client, producer, internal
161+
if (span.getParentSpanId().isEmpty()) {
162+
// even the client type can be root if there is no parentSpanId value.
163+
rootSpanList.add(span);
164+
} else {
165+
childSpanList.add(span);
166+
}
167+
}
168+
}
169+
}
170+
171+
List<SpanEventBo> findLinkSpan(long startTime, List<Span> childSpanList, ByteString parentSpanId, int depth) {
172+
List<SpanEventBo> spanEventList = new ArrayList<>();
173+
if (depth > 99) {
174+
// defensive check
175+
return spanEventList;
176+
}
177+
178+
List<Span> linkSpanList = new ArrayList<>();
179+
for (Span span : childSpanList) {
180+
if (parentSpanId.equals(span.getParentSpanId())) {
181+
linkSpanList.add(span);
182+
}
183+
}
184+
childSpanList.removeAll(linkSpanList);
185+
186+
for (Span span : linkSpanList) {
187+
final SpanEventBo spanEventBo = spanEventMapper.map(startTime, span, depth);
188+
spanEventList.add(spanEventBo);
189+
List<SpanEventBo> list = findLinkSpan(startTime, childSpanList, span.getSpanId(), depth + 1);
190+
spanEventList.addAll(list);
191+
}
192+
193+
return spanEventList;
194+
}
195+
196+
List<SpanChunkBo> findLinkSpanChunk(IdAndName idAndName, List<Span> childSpanList) {
197+
// Collect all spanIds for quick lookup
198+
List<SpanChunkBo> spanChunkList = new ArrayList<>();
199+
Set<ByteString> spanIdSet = new HashSet<>();
200+
for (Span s : childSpanList) {
201+
spanIdSet.add(s.getSpanId());
202+
}
203+
// Identify local roots: parentSpanId is empty or not present in current set
204+
List<Span> localRootSpanList = new ArrayList<>();
205+
for (Span s : childSpanList) {
206+
if (!spanIdSet.contains(s.getParentSpanId())) {
207+
localRootSpanList.add(s);
208+
}
209+
}
210+
// If no local root (possible cycle), pick one arbitrarily
211+
if (localRootSpanList.isEmpty()) {
212+
localRootSpanList.addAll(childSpanList);
213+
}
214+
for (Span localRootSpan : localRootSpanList) {
215+
// Remove root from remaining list so it's not processed again
216+
childSpanList.remove(localRootSpan);
217+
// Map root as SpanChunk (attached to its parentSpanId if present)
218+
SpanChunkBo spanChunkBo = spanChunkMapper.map(idAndName, localRootSpan);
219+
long rootStartTime = TimeUnit.NANOSECONDS.toMillis(localRootSpan.getStartTimeUnixNano());
220+
// Recursively attach children as events
221+
List<SpanEventBo> childrenEvents = findLinkSpan(rootStartTime, childSpanList, localRootSpan.getSpanId(), 2);
222+
spanChunkBo.addSpanEventBoList(childrenEvents);
223+
spanChunkList.add(spanChunkBo);
224+
}
225+
226+
return spanChunkList;
227+
}
119228
}

otlptrace/otlptrace-collector/src/main/java/com/navercorp/pinpoint/otlp/trace/collector/mapper/OtlpTraceMapperUtils.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,40 @@ public static IdAndName getId(List<KeyValue> attributesList) {
4343
if (agentId == null) {
4444
final String serviceInstanceId = attributesList.stream().filter(kv -> kv.getKey().equals(KEY_SERVICE_INSTANCE_ID)).findFirst().map(kv -> kv.getValue().getStringValue()).orElse(null);
4545
if (serviceInstanceId == null) {
46-
throw new IllegalArgumentException("not found agentId");
47-
}
48-
// check UUID
49-
if (serviceInstanceId.length() == 36) {
50-
final UUID uuid = UUID.fromString(serviceInstanceId);
51-
final String encoded = Base64Utils.encode(uuid.toString());
52-
return new IdAndName(encoded, serviceInstanceId, getApplicationName(attributesList));
53-
}
54-
// agentId
55-
if (!IdValidateUtils.validateId(serviceInstanceId, PinpointConstants.AGENT_ID_MAX_LEN)) {
56-
throw new IllegalArgumentException("invalid agentId=" + serviceInstanceId);
46+
final String hostName = attributesList.stream().filter(kv -> kv.getKey().equals("host.name")).findFirst().map(kv -> kv.getValue().getStringValue()).orElse(null);
47+
if (hostName != null) {
48+
if (!IdValidateUtils.validateId(hostName, PinpointConstants.AGENT_ID_MAX_LEN)) {
49+
throw new IllegalArgumentException("invalid host.name=" + hostName);
50+
}
51+
return new IdAndName(hostName, null, getApplicationName(attributesList));
52+
}
53+
// TODO
54+
final String applicationName = getApplicationName(attributesList);
55+
if (!IdValidateUtils.validateId(applicationName, PinpointConstants.AGENT_ID_MAX_LEN)) {
56+
throw new IllegalArgumentException("invalid agentId(derived from applicationName)=" + applicationName);
57+
}
58+
return new IdAndName(applicationName, null, applicationName);
59+
} else {
60+
// check UUID safely
61+
if (serviceInstanceId.length() == 36) {
62+
try {
63+
final UUID uuid = UUID.fromString(serviceInstanceId);
64+
final String encoded = Base64Utils.encode(uuid);
65+
return new IdAndName(encoded, serviceInstanceId, getApplicationName(attributesList));
66+
} catch (IllegalArgumentException ignore) {
67+
// not a valid UUID string, fall through to treat as plain agentId
68+
}
69+
}
70+
// agentId
71+
if (!IdValidateUtils.validateId(serviceInstanceId, PinpointConstants.AGENT_ID_MAX_LEN)) {
72+
throw new IllegalArgumentException("invalid service.instance.id=" + serviceInstanceId);
73+
}
5774
}
5875
return new IdAndName(serviceInstanceId, null, getApplicationName(attributesList));
5976
}
6077

6178
if (!IdValidateUtils.validateId(agentId, PinpointConstants.AGENT_ID_MAX_LEN)) {
62-
throw new IllegalArgumentException("invalid agentId=" + agentId);
79+
throw new IllegalArgumentException("invalid pinpoint.agentId=" + agentId);
6380
}
6481

6582
return new IdAndName(agentId, null, getApplicationName(attributesList));

0 commit comments

Comments
 (0)