3939import java .util .HashMap ;
4040import java .util .Iterator ;
4141import java .util .LinkedHashMap ;
42+ import java .util .LinkedList ;
4243import java .util .List ;
4344import java .util .Map ;
4445import java .util .concurrent .ConcurrentHashMap ;
@@ -54,11 +55,11 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
5455 static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit .MINUTES .toNanos (5 );
5556
5657 private static final StatsPoint REPORT =
57- new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 );
58+ new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 , null );
5859 private static final StatsPoint POISON_PILL =
59- new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 );
60+ new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 , null );
6061
61- private final Map <Long , StatsBucket > timeToBucket = new HashMap <>();
62+ private final Map <Long , Map < String , StatsBucket > > timeToBucket = new HashMap <>();
6263 private final MpscArrayQueue <InboxItem > inbox = new MpscArrayQueue <>(1024 );
6364 private final DatastreamsPayloadWriter payloadWriter ;
6465 private final DDAgentFeaturesDiscovery features ;
@@ -74,6 +75,8 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
7475 private volatile boolean agentSupportsDataStreams = false ;
7576 private volatile boolean configSupportsDataStreams = false ;
7677 private final ConcurrentHashMap <String , SchemaSampler > schemaSamplers ;
78+ private static final ConcurrentHashMap <Long , String > threadServiceNames =
79+ new ConcurrentHashMap <>();
7780
7881 public DefaultDataStreamsMonitoring (
7982 Config config ,
@@ -184,10 +187,24 @@ public void setProduceCheckpoint(String type, String target) {
184187 setProduceCheckpoint (type , target , DataStreamsContextCarrier .NoOp .INSTANCE , false );
185188 }
186189
190+ @ Override
191+ public void setThreadServiceName (Long threadId , String serviceName ) {
192+ threadServiceNames .put (threadId , serviceName );
193+ }
194+
195+ @ Override
196+ public void clearThreadServiceName (Long threadId ) {
197+ threadServiceNames .remove (threadId );
198+ }
199+
200+ private static String getThreadServiceNameOverride () {
201+ return threadServiceNames .getOrDefault (Thread .currentThread ().getId (), null );
202+ }
203+
187204 @ Override
188205 public PathwayContext newPathwayContext () {
189206 if (configSupportsDataStreams ) {
190- return new DefaultPathwayContext (timeSource , hashOfKnownTags );
207+ return new DefaultPathwayContext (timeSource , hashOfKnownTags , getThreadServiceNameOverride () );
191208 } else {
192209 return AgentTracer .NoopPathwayContext .INSTANCE ;
193210 }
@@ -196,7 +213,7 @@ public PathwayContext newPathwayContext() {
196213 @ Override
197214 public HttpCodec .Extractor extractor (HttpCodec .Extractor delegate ) {
198215 return new DataStreamContextExtractor (
199- delegate , timeSource , traceConfigSupplier , hashOfKnownTags );
216+ delegate , timeSource , traceConfigSupplier , hashOfKnownTags , getThreadServiceNameOverride () );
200217 }
201218
202219 @ Override
@@ -212,7 +229,8 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie
212229 carrier ,
213230 DataStreamsContextCarrierAdapter .INSTANCE ,
214231 this .timeSource ,
215- this .hashOfKnownTags );
232+ this .hashOfKnownTags ,
233+ getThreadServiceNameOverride ());
216234 ((DDSpan ) span ).context ().mergePathwayContext (pathwayContext );
217235 }
218236 }
@@ -226,7 +244,8 @@ public void trackBacklog(LinkedHashMap<String, String> sortedTags, long value) {
226244 }
227245 tags .add (tag );
228246 }
229- inbox .offer (new Backlog (tags , value , timeSource .getCurrentTimeNanos ()));
247+ inbox .offer (
248+ new Backlog (tags , value , timeSource .getCurrentTimeNanos (), getThreadServiceNameOverride ()));
230249 }
231250
232251 @ Override
@@ -308,6 +327,15 @@ public void close() {
308327 }
309328
310329 private class InboxProcessor implements Runnable {
330+
331+ private StatsBucket getStatsBucket (final long timestamp , final String serviceNameOverride ) {
332+ long bucket = currentBucket (timestamp );
333+ Map <String , StatsBucket > statsBucketMap =
334+ timeToBucket .computeIfAbsent (bucket , startTime -> new HashMap <>(1 ));
335+ return statsBucketMap .computeIfAbsent (
336+ serviceNameOverride , s -> new StatsBucket (bucket , bucketDurationNanos ));
337+ }
338+
311339 @ Override
312340 public void run () {
313341 Thread currentThread = Thread .currentThread ();
@@ -335,17 +363,14 @@ public void run() {
335363 } else if (supportsDataStreams ) {
336364 if (payload instanceof StatsPoint ) {
337365 StatsPoint statsPoint = (StatsPoint ) payload ;
338- Long bucket = currentBucket (statsPoint .getTimestampNanos ());
339366 StatsBucket statsBucket =
340- timeToBucket . computeIfAbsent (
341- bucket , startTime -> new StatsBucket ( startTime , bucketDurationNanos ));
367+ getStatsBucket (
368+ statsPoint . getTimestampNanos (), statsPoint . getServiceNameOverride ( ));
342369 statsBucket .addPoint (statsPoint );
343370 } else if (payload instanceof Backlog ) {
344371 Backlog backlog = (Backlog ) payload ;
345- Long bucket = currentBucket (backlog .getTimestampNanos ());
346372 StatsBucket statsBucket =
347- timeToBucket .computeIfAbsent (
348- bucket , startTime -> new StatsBucket (startTime , bucketDurationNanos ));
373+ getStatsBucket (backlog .getTimestampNanos (), backlog .getServiceNameOverride ());
349374 statsBucket .addBacklog (backlog );
350375 }
351376 }
@@ -363,21 +388,30 @@ private long currentBucket(long timestampNanos) {
363388 private void flush (long timestampNanos ) {
364389 long currentBucket = currentBucket (timestampNanos );
365390
366- List <StatsBucket > includedBuckets = new ArrayList <>();
367- Iterator <Map .Entry <Long , StatsBucket >> mapIterator = timeToBucket .entrySet ().iterator ();
391+ // stats are grouped by time buckets and service names
392+ Map <String , List <StatsBucket >> includedBuckets = new HashMap <>();
393+ Iterator <Map .Entry <Long , Map <String , StatsBucket >>> mapIterator =
394+ timeToBucket .entrySet ().iterator ();
368395
369396 while (mapIterator .hasNext ()) {
370- Map .Entry <Long , StatsBucket > entry = mapIterator .next ();
371-
397+ Map .Entry <Long , Map <String , StatsBucket >> entry = mapIterator .next ();
372398 if (entry .getKey () < currentBucket ) {
373399 mapIterator .remove ();
374- includedBuckets .add (entry .getValue ());
400+ for (Map .Entry <String , StatsBucket > buckets : entry .getValue ().entrySet ()) {
401+ if (!includedBuckets .containsKey (buckets .getKey ())) {
402+ includedBuckets .put (buckets .getKey (), new LinkedList <>());
403+ }
404+
405+ includedBuckets .get (buckets .getKey ()).add (buckets .getValue ());
406+ }
375407 }
376408 }
377409
378410 if (!includedBuckets .isEmpty ()) {
379- log .debug ("Flushing {} buckets" , includedBuckets .size ());
380- payloadWriter .writePayload (includedBuckets );
411+ for (Map .Entry <String , List <StatsBucket >> entry : includedBuckets .entrySet ()) {
412+ log .debug ("Flushing {} buckets" , entry .getValue ());
413+ payloadWriter .writePayload (entry .getValue (), entry .getKey ());
414+ }
381415 }
382416 }
383417
0 commit comments