3939import java .util .HashMap ;
4040import java .util .Iterator ;
4141import java .util .LinkedHashMap ;
42+ import java .util .LinkedList ;
4243import java .util .List ;
4344import java .util .Map ;
4445import java .util .concurrent .ConcurrentHashMap ;
@@ -54,11 +55,11 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
5455 static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit .MINUTES .toNanos (5 );
5556
5657 private static final StatsPoint REPORT =
57- new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 );
58+ new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 , null );
5859 private static final StatsPoint POISON_PILL =
59- new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 );
60+ new StatsPoint (Collections .emptyList (), 0 , 0 , 0 , 0 , 0 , 0 , 0 , null );
6061
61- private final Map <Long , StatsBucket > timeToBucket = new HashMap <>();
62+ private final Map <Long , Map < String , StatsBucket > > timeToBucket = new HashMap <>();
6263 private final MpscArrayQueue <InboxItem > inbox = new MpscArrayQueue <>(1024 );
6364 private final DatastreamsPayloadWriter payloadWriter ;
6465 private final DDAgentFeaturesDiscovery features ;
@@ -74,6 +75,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
7475 private volatile boolean agentSupportsDataStreams = false ;
7576 private volatile boolean configSupportsDataStreams = false ;
7677 private final ConcurrentHashMap <String , SchemaSampler > schemaSamplers ;
78+ private static final ThreadLocal <String > serviceNameOverride = new ThreadLocal <>();
7779
7880 public DefaultDataStreamsMonitoring (
7981 Config config ,
@@ -184,10 +186,29 @@ public void setProduceCheckpoint(String type, String target) {
184186 setProduceCheckpoint (type , target , DataStreamsContextCarrier .NoOp .INSTANCE , false );
185187 }
186188
189+ @ Override
190+ public void setThreadServiceName (String serviceName ) {
191+ if (serviceName == null ) {
192+ clearThreadServiceName ();
193+ return ;
194+ }
195+
196+ serviceNameOverride .set (serviceName );
197+ }
198+
199+ @ Override
200+ public void clearThreadServiceName () {
201+ serviceNameOverride .remove ();
202+ }
203+
204+ private static String getThreadServiceName () {
205+ return serviceNameOverride .get ();
206+ }
207+
187208 @ Override
188209 public PathwayContext newPathwayContext () {
189210 if (configSupportsDataStreams ) {
190- return new DefaultPathwayContext (timeSource , hashOfKnownTags );
211+ return new DefaultPathwayContext (timeSource , hashOfKnownTags , getThreadServiceName () );
191212 } else {
192213 return AgentTracer .NoopPathwayContext .INSTANCE ;
193214 }
@@ -196,7 +217,7 @@ public PathwayContext newPathwayContext() {
196217 @ Override
197218 public HttpCodec .Extractor extractor (HttpCodec .Extractor delegate ) {
198219 return new DataStreamContextExtractor (
199- delegate , timeSource , traceConfigSupplier , hashOfKnownTags );
220+ delegate , timeSource , traceConfigSupplier , hashOfKnownTags , getThreadServiceName () );
200221 }
201222
202223 @ Override
@@ -212,7 +233,8 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie
212233 carrier ,
213234 DataStreamsContextCarrierAdapter .INSTANCE ,
214235 this .timeSource ,
215- this .hashOfKnownTags );
236+ this .hashOfKnownTags ,
237+ getThreadServiceName ());
216238 ((DDSpan ) span ).context ().mergePathwayContext (pathwayContext );
217239 }
218240 }
@@ -226,7 +248,7 @@ public void trackBacklog(LinkedHashMap<String, String> sortedTags, long value) {
226248 }
227249 tags .add (tag );
228250 }
229- inbox .offer (new Backlog (tags , value , timeSource .getCurrentTimeNanos ()));
251+ inbox .offer (new Backlog (tags , value , timeSource .getCurrentTimeNanos (), getThreadServiceName () ));
230252 }
231253
232254 @ Override
@@ -308,6 +330,15 @@ public void close() {
308330 }
309331
310332 private class InboxProcessor implements Runnable {
333+
334+ private StatsBucket getStatsBucket (final long timestamp , final String serviceNameOverride ) {
335+ long bucket = currentBucket (timestamp );
336+ Map <String , StatsBucket > statsBucketMap =
337+ timeToBucket .computeIfAbsent (bucket , startTime -> new HashMap <>(1 ));
338+ return statsBucketMap .computeIfAbsent (
339+ serviceNameOverride , s -> new StatsBucket (bucket , bucketDurationNanos ));
340+ }
341+
311342 @ Override
312343 public void run () {
313344 Thread currentThread = Thread .currentThread ();
@@ -335,17 +366,14 @@ public void run() {
335366 } else if (supportsDataStreams ) {
336367 if (payload instanceof StatsPoint ) {
337368 StatsPoint statsPoint = (StatsPoint ) payload ;
338- Long bucket = currentBucket (statsPoint .getTimestampNanos ());
339369 StatsBucket statsBucket =
340- timeToBucket . computeIfAbsent (
341- bucket , startTime -> new StatsBucket ( startTime , bucketDurationNanos ));
370+ getStatsBucket (
371+ statsPoint . getTimestampNanos (), statsPoint . getServiceNameOverride ( ));
342372 statsBucket .addPoint (statsPoint );
343373 } else if (payload instanceof Backlog ) {
344374 Backlog backlog = (Backlog ) payload ;
345- Long bucket = currentBucket (backlog .getTimestampNanos ());
346375 StatsBucket statsBucket =
347- timeToBucket .computeIfAbsent (
348- bucket , startTime -> new StatsBucket (startTime , bucketDurationNanos ));
376+ getStatsBucket (backlog .getTimestampNanos (), backlog .getServiceNameOverride ());
349377 statsBucket .addBacklog (backlog );
350378 }
351379 }
@@ -363,21 +391,32 @@ private long currentBucket(long timestampNanos) {
363391 private void flush (long timestampNanos ) {
364392 long currentBucket = currentBucket (timestampNanos );
365393
366- List <StatsBucket > includedBuckets = new ArrayList <>();
367- Iterator <Map .Entry <Long , StatsBucket >> mapIterator = timeToBucket .entrySet ().iterator ();
394+ // stats are grouped by time buckets and service names
395+ Map <String , List <StatsBucket >> includedBuckets = new HashMap <>();
396+ Iterator <Map .Entry <Long , Map <String , StatsBucket >>> mapIterator =
397+ timeToBucket .entrySet ().iterator ();
368398
369399 while (mapIterator .hasNext ()) {
370- Map .Entry <Long , StatsBucket > entry = mapIterator .next ();
371-
400+ Map .Entry <Long , Map <String , StatsBucket >> entry = mapIterator .next ();
372401 if (entry .getKey () < currentBucket ) {
373402 mapIterator .remove ();
374- includedBuckets .add (entry .getValue ());
403+ for (Map .Entry <String , StatsBucket > buckets : entry .getValue ().entrySet ()) {
404+ if (!includedBuckets .containsKey (buckets .getKey ())) {
405+ includedBuckets .put (buckets .getKey (), new LinkedList <>());
406+ }
407+
408+ includedBuckets .get (buckets .getKey ()).add (buckets .getValue ());
409+ }
375410 }
376411 }
377412
378413 if (!includedBuckets .isEmpty ()) {
379- log .debug ("Flushing {} buckets" , includedBuckets .size ());
380- payloadWriter .writePayload (includedBuckets );
414+ for (Map .Entry <String , List <StatsBucket >> entry : includedBuckets .entrySet ()) {
415+ if (!entry .getValue ().isEmpty ()) {
416+ log .debug ("Flushing {} buckets ({})" , entry .getValue (), entry .getKey ());
417+ payloadWriter .writePayload (entry .getValue (), entry .getKey ());
418+ }
419+ }
381420 }
382421 }
383422
0 commit comments