77import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .DROPPED_SPANS_COUNTER ;
88import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .INFLIGHT_TRACE_MAX_SPAN_COUNT ;
99import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .OUTPUT_TOPIC_PRODUCER ;
10+ import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG ;
11+ import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .PEER_CORRELATION_ENABLED_AGENTS ;
12+ import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .PEER_CORRELATION_ENABLED_CUSTOMERS ;
13+ import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE ;
1014import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .RAW_SPANS_GROUPER_JOB_CONFIG ;
1115import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY ;
1216import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .SPAN_STATE_STORE_NAME ;
1519import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .TRACE_EMIT_PUNCTUATOR_STORE_NAME ;
1620import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .TRACE_STATE_STORE ;
1721import static org .hypertrace .core .rawspansgrouper .RawSpanGrouperConstants .TRUNCATED_TRACES_COUNTER ;
22+ import static org .hypertrace .traceenricher .enrichedspan .constants .EnrichedSpanConstants .PEER_SERVICE_NAME ;
1823
1924import com .typesafe .config .Config ;
2025import io .micrometer .core .instrument .Counter ;
2328import java .time .Clock ;
2429import java .time .Duration ;
2530import java .time .Instant ;
31+ import java .util .Collections ;
2632import java .util .HashMap ;
2733import java .util .List ;
2834import java .util .Map ;
35+ import java .util .Objects ;
36+ import java .util .Optional ;
2937import java .util .concurrent .ConcurrentHashMap ;
3038import java .util .concurrent .ConcurrentMap ;
3139import java .util .concurrent .TimeUnit ;
3644import org .apache .kafka .streams .processor .api .ProcessorContext ;
3745import org .apache .kafka .streams .processor .api .Record ;
3846import org .apache .kafka .streams .state .KeyValueStore ;
47+ import org .hypertrace .core .datamodel .Event ;
3948import org .hypertrace .core .datamodel .RawSpan ;
4049import org .hypertrace .core .datamodel .StructuredTrace ;
50+ import org .hypertrace .core .datamodel .Timestamps ;
4151import org .hypertrace .core .datamodel .shared .HexUtils ;
52+ import org .hypertrace .core .datamodel .shared .SpanAttributeUtils ;
53+ import org .hypertrace .core .datamodel .shared .trace .AttributeValueCreator ;
54+ import org .hypertrace .core .datamodel .shared .trace .StructuredTraceBuilder ;
4255import org .hypertrace .core .kafkastreams .framework .punctuators .ThrottledPunctuatorConfig ;
56+ import org .hypertrace .core .rawspansgrouper .utils .TraceLatencyMeter ;
57+ import org .hypertrace .core .rawspansgrouper .validator .PeerIdentityValidator ;
4358import org .hypertrace .core .serviceframework .metrics .PlatformMetricsRegistry ;
59+ import org .hypertrace .core .spannormalizer .IpIdentity ;
60+ import org .hypertrace .core .spannormalizer .PeerIdentity ;
4461import org .hypertrace .core .spannormalizer .SpanIdentity ;
62+ import org .hypertrace .core .spannormalizer .SpanMetadata ;
4563import org .hypertrace .core .spannormalizer .TraceIdentity ;
4664import org .hypertrace .core .spannormalizer .TraceState ;
65+ import org .hypertrace .semantic .convention .utils .http .HttpSemanticConventionUtils ;
66+ import org .hypertrace .semantic .convention .utils .span .SpanSemanticConventionUtils ;
4767import org .slf4j .Logger ;
4868import org .slf4j .LoggerFactory ;
4969
@@ -60,6 +80,7 @@ public class RawSpansProcessor
6080 private static final Logger logger = LoggerFactory .getLogger (RawSpansProcessor .class );
6181 private static final String PROCESSING_LATENCY_TIMER =
6282 "hypertrace.rawspansgrouper.processing.latency" ;
83+ private static final String ALL = "*" ;
6384 private static final ConcurrentMap <String , Timer > tenantToSpansGroupingTimer =
6485 new ConcurrentHashMap <>();
6586 // counter for number of spans dropped per tenant
@@ -69,25 +90,46 @@ public class RawSpansProcessor
6990 // counter for number of truncated traces per tenant
7091 private static final ConcurrentMap <String , Counter > truncatedTracesCounter =
7192 new ConcurrentHashMap <>();
93+ private ProcessorContext <TraceIdentity , StructuredTrace > context ;
7294 private final Clock clock ;
7395 private KeyValueStore <SpanIdentity , RawSpan > spanStore ;
7496 private KeyValueStore <TraceIdentity , TraceState > traceStateStore ;
97+ private KeyValueStore <PeerIdentity , SpanMetadata > peerIdentityToSpanMetadataStateStore ;
7598 private long groupingWindowTimeoutMs ;
7699 private double dataflowSamplingPercent = -1 ;
77100 private static final Map <String , Long > maxSpanCountMap = new HashMap <>();
78101 private long defaultMaxSpanCountLimit = Long .MAX_VALUE ;
79102 private TraceEmitPunctuator traceEmitPunctuator ;
80103 private Cancellable traceEmitTasksPunctuatorCancellable ;
104+ private String peerCorrelationAgentTypeAttribute ;
105+ private List <String > peerCorrelationEnabledCustomers ;
106+ private List <String > peerCorrelationEnabledAgents ;
107+ private TraceLatencyMeter traceLatencyMeter ;
81108
82109 public RawSpansProcessor (Clock clock ) {
83110 this .clock = clock ;
84111 }
85112
86113 @ Override
87114 public void init (ProcessorContext <TraceIdentity , StructuredTrace > context ) {
115+ this .context = context ;
88116 this .spanStore = context .getStateStore (SPAN_STATE_STORE_NAME );
89117 this .traceStateStore = context .getStateStore (TRACE_STATE_STORE );
118+ this .peerIdentityToSpanMetadataStateStore =
119+ context .getStateStore (PEER_IDENTITY_TO_SPAN_METADATA_STATE_STORE );
90120 Config jobConfig = (Config ) (context .appConfigs ().get (RAW_SPANS_GROUPER_JOB_CONFIG ));
121+ this .peerCorrelationAgentTypeAttribute =
122+ jobConfig .hasPath (PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG )
123+ ? jobConfig .getString (PEER_CORRELATION_AGENT_TYPE_ATTRIBUTE_CONFIG )
124+ : null ;
125+ this .peerCorrelationEnabledCustomers =
126+ jobConfig .hasPath (PEER_CORRELATION_ENABLED_CUSTOMERS )
127+ ? jobConfig .getStringList (PEER_CORRELATION_ENABLED_CUSTOMERS )
128+ : Collections .emptyList ();
129+ this .peerCorrelationEnabledAgents =
130+ jobConfig .hasPath (PEER_CORRELATION_ENABLED_AGENTS )
131+ ? jobConfig .getStringList (PEER_CORRELATION_ENABLED_AGENTS )
132+ : Collections .emptyList ();
91133 this .groupingWindowTimeoutMs =
92134 jobConfig .getLong (SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY ) * 1000 ;
93135
@@ -96,15 +138,13 @@ public void init(ProcessorContext<TraceIdentity, StructuredTrace> context) {
96138 && jobConfig .getDouble (DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY ) <= 100 ) {
97139 this .dataflowSamplingPercent = jobConfig .getDouble (DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY );
98140 }
141+ this .traceLatencyMeter = new TraceLatencyMeter (dataflowSamplingPercent );
99142
100143 if (jobConfig .hasPath (INFLIGHT_TRACE_MAX_SPAN_COUNT )) {
101144 Config subConfig = jobConfig .getConfig (INFLIGHT_TRACE_MAX_SPAN_COUNT );
102145 subConfig
103146 .entrySet ()
104- .forEach (
105- (entry ) -> {
106- maxSpanCountMap .put (entry .getKey (), subConfig .getLong (entry .getKey ()));
107- });
147+ .forEach (entry -> maxSpanCountMap .put (entry .getKey (), subConfig .getLong (entry .getKey ())));
108148 }
109149
110150 if (jobConfig .hasPath (DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT )) {
@@ -151,15 +191,23 @@ public void process(Record<TraceIdentity, RawSpan> record) {
151191 TraceIdentity key = record .key ();
152192 RawSpan value = record .value ();
153193 TraceState traceState = traceStateStore .get (key );
154- boolean firstEntry = (traceState == null );
155194
156195 if (shouldDropSpan (key , traceState )) {
157196 return ;
158197 }
159198
199+ Event event = value .getEvent ();
200+ // spans whose trace is not created by ByPass flow, their trace will be created in below
201+ // function call. Those spans' trace will not be created by TraceEmitPunctuator
202+ if (isPeerServiceNameIdentificationRequired (event )) {
203+ processSpanForPeerServiceNameIdentification (key , value , traceState , currentTimeMs );
204+ return ;
205+ }
206+
207+ boolean firstEntry = (traceState == null );
160208 String tenantId = key .getTenantId ();
161209 ByteBuffer traceId = value .getTraceId ();
162- ByteBuffer spanId = value . getEvent () .getEventId ();
210+ ByteBuffer spanId = event .getEventId ();
163211 spanStore .put (new SpanIdentity (tenantId , traceId , spanId ), value );
164212
165213 if (firstEntry ) {
@@ -196,7 +244,104 @@ public void process(Record<TraceIdentity, RawSpan> record) {
196244 .record (Duration .between (start , Instant .now ()).toMillis (), TimeUnit .MILLISECONDS );
197245 // no need to do context.forward. the punctuator will emit the trace once it's eligible to be
198246 // emitted
199- return ;
247+ }
248+
249+ private void processSpanForPeerServiceNameIdentification (
250+ TraceIdentity key , RawSpan value , TraceState traceState , long currentTimeMs ) {
251+ Event event = value .getEvent ();
252+ String tenantId = key .getTenantId ();
253+ ByteBuffer traceId = value .getTraceId ();
254+ boolean firstEntry = (traceState == null );
255+ Optional <String > maybeEnvironment = HttpSemanticConventionUtils .getEnvironmentForSpan (event );
256+ if (SpanSemanticConventionUtils .isClientSpanForOCFormat (
257+ event .getAttributes ().getAttributeMap ())) {
258+ handleClientSpan (tenantId , event , maybeEnvironment .orElse (null ));
259+ } else {
260+ handleServerSpan (tenantId , event , maybeEnvironment .orElse (null ));
261+ }
262+
263+ // create structured trace and forward
264+ Timestamps timestamps =
265+ traceLatencyMeter .trackEndToEndLatencyTimestamps (
266+ currentTimeMs , firstEntry ? currentTimeMs : traceState .getTraceStartTimestamp ());
267+ StructuredTrace trace =
268+ StructuredTraceBuilder .buildStructuredTraceFromRawSpans (
269+ List .of (value ), traceId , tenantId , timestamps );
270+ context .forward (new Record <>(key , trace , currentTimeMs ), OUTPUT_TOPIC_PRODUCER );
271+ }
272+
273+ // put the peer service identity and corresponding service name in state store
274+ private void handleClientSpan (String tenantId , Event event , String environment ) {
275+ Optional <String > maybeHostAddr = HttpSemanticConventionUtils .getHostIpAddress (event );
276+ Optional <String > maybePeerAddr = HttpSemanticConventionUtils .getPeerIpAddress (event );
277+ Optional <String > maybePeerPort = HttpSemanticConventionUtils .getPeerPort (event );
278+ String serviceName = event .getServiceName ();
279+ PeerIdentity peerIdentity =
280+ PeerIdentity .newBuilder ()
281+ .setIpIdentity (
282+ IpIdentity .newBuilder ()
283+ .setTenantId (tenantId )
284+ .setEnvironment (environment )
285+ .setHostAddr (maybeHostAddr .orElse (null ))
286+ .setPeerAddr (maybePeerAddr .orElse (null ))
287+ .setPeerPort (maybePeerPort .orElse (null ))
288+ .build ())
289+ .build ();
290+ if (PeerIdentityValidator .isValid (peerIdentity )) {
291+ this .peerIdentityToSpanMetadataStateStore .put (
292+ peerIdentity ,
293+ SpanMetadata .newBuilder ()
294+ .setServiceName (serviceName )
295+ .setEventId (event .getEventId ())
296+ .build ());
297+ }
298+ }
299+
300+ // get the service name for that peer service identity and correlate the current span
301+ private void handleServerSpan (String tenantId , Event event , String environment ) {
302+ Optional <String > maybePeerAddr = HttpSemanticConventionUtils .getPeerIpAddress (event );
303+ Optional <String > maybeHostAddr = HttpSemanticConventionUtils .getHostIpAddress (event );
304+ Optional <String > maybeHostPort = HttpSemanticConventionUtils .getHostPort (event );
305+ PeerIdentity peerIdentity =
306+ PeerIdentity .newBuilder ()
307+ .setIpIdentity (
308+ IpIdentity .newBuilder ()
309+ .setTenantId (tenantId )
310+ .setEnvironment (environment )
311+ .setHostAddr (maybePeerAddr .orElse (null ))
312+ .setPeerAddr (maybeHostAddr .orElse (null ))
313+ .setPeerPort (maybeHostPort .orElse (null ))
314+ .build ())
315+ .build ();
316+ if (PeerIdentityValidator .isValid (peerIdentity )) {
317+ SpanMetadata spanMetadata = this .peerIdentityToSpanMetadataStateStore .get (peerIdentity );
318+ if (Objects .nonNull (spanMetadata )) {
319+ logger .debug (
320+ "Adding {} as: {} from spanId: {} in spanId: {} with service name: {}" ,
321+ PEER_SERVICE_NAME ,
322+ spanMetadata .getServiceName (),
323+ HexUtils .getHex (spanMetadata .getEventId ()),
324+ HexUtils .getHex (event .getEventId ()),
325+ event .getServiceName ());
326+
327+ event
328+ .getEnrichedAttributes ()
329+ .getAttributeMap ()
330+ .put (PEER_SERVICE_NAME , AttributeValueCreator .create (spanMetadata .getServiceName ()));
331+ }
332+ }
333+ }
334+
335+ private boolean isPeerServiceNameIdentificationRequired (Event event ) {
336+ if (!this .peerCorrelationEnabledCustomers .contains (event .getCustomerId ())
337+ && !this .peerCorrelationEnabledCustomers .contains (ALL )) {
338+ return false ;
339+ }
340+
341+ String agentType =
342+ SpanAttributeUtils .getStringAttributeWithDefault (
343+ event , this .peerCorrelationAgentTypeAttribute , null );
344+ return Objects .nonNull (agentType ) && this .peerCorrelationEnabledAgents .contains (agentType );
200345 }
201346
202347 private boolean shouldDropSpan (TraceIdentity key , TraceState traceState ) {
0 commit comments