Skip to content

Commit 5ec63df

Browse files
committed
Fixed more tests
1 parent 1c32c8d commit 5ec63df

File tree

3 files changed

+38
-21
lines changed

3 files changed

+38
-21
lines changed

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,19 +109,12 @@ public synchronized void setCheckpoint(
109109
}
110110

111111
// generate node hash
112-
long nodeHash = hashOfKnownTags;
113-
if (serviceNameOverride != null) {
114-
nodeHash = FNV64Hash.continueHash(nodeHash, serviceNameOverride, FNV64Hash.Version.v1);
115-
}
116-
nodeHash =
117-
FNV64Hash.continueHash(
118-
nodeHash, DataStreamsTags.longToBytes(context.tags().getHash()), FNV64Hash.Version.v1);
119-
112+
long nodeHash = context.tags().getHash();
120113
// loop protection - a node should not be chosen as parent
121114
// for a sequential node with the same direction, as this
122115
// will cause a `cardinality explosion` for hash / parentHash tag values
123116
DataStreamsTags.Direction direction = context.tags().getDirectionValue();
124-
if (direction == previousDirection) {
117+
if (direction == previousDirection && previousDirection != null) {
125118
hash = closestOppositeDirectionHash;
126119
} else {
127120
previousDirection = direction;

dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import datadog.trace.api.ProcessTags
77
import datadog.trace.api.TagMap
88
import datadog.trace.api.TraceConfig
99
import datadog.trace.api.WellKnownTags
10+
import datadog.trace.api.config.GeneralConfig
1011
import datadog.trace.api.datastreams.DataStreamsTags
1112
import datadog.trace.api.datastreams.StatsPoint
1213
import datadog.trace.api.time.ControllableTimeSource
1314
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation
1415
import datadog.trace.bootstrap.instrumentation.api.AgentSpan
1516
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
1617
import datadog.trace.common.metrics.Sink
18+
import datadog.trace.core.CoreTracer
1719
import datadog.trace.core.propagation.ExtractedContext
1820
import datadog.trace.core.test.DDCoreSpecification
1921

@@ -502,9 +504,10 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
502504
503505
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
504506
507+
DataStreamsTags.setGlobalBaseHash(baseHash)
505508
def context = new DefaultPathwayContext(timeSource, baseHash, null)
506509
timeSource.advance(MILLISECONDS.toNanos(50))
507-
context.setCheckpoint(fromTags(DataStreamsTags.create("itnernal", DataStreamsTags.Direction.Inbound)), pointConsumer)
510+
context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer)
508511
def encoded = context.encode()
509512
Map<String, String> carrier = [
510513
(PROPAGATION_KEY_BASE64): encoded,
@@ -518,6 +521,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
518521
def extractedSpan = AgentSpan.fromContext(extractedContext)
519522
520523
then:
524+
encoded == "L+lDG/Pa9hRkZA=="
521525
!dynamicConfigEnabled || extractedSpan != null
522526
if (dynamicConfigEnabled) {
523527
def extracted = extractedSpan.context()
@@ -546,8 +550,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
546550
isDataStreamsEnabled() >> { return globalDsmEnabled }
547551
}
548552
553+
def tracerApi = Mock(AgentTracer.TracerAPI) {
554+
captureTraceConfig() >> globalTraceConfig
555+
}
556+
AgentTracer.TracerAPI originalTracer = AgentTracer.get()
557+
AgentTracer.forceRegister(tracerApi)
558+
549559
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
550560
561+
DataStreamsTags.setGlobalBaseHash(baseHash)
551562
def context = new DefaultPathwayContext(timeSource, baseHash, null)
552563
timeSource.advance(MILLISECONDS.toNanos(50))
553564
context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer)
@@ -562,6 +573,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
562573
def extractedSpan = AgentSpan.fromContext(extractedContext)
563574
564575
then:
576+
encoded == "L+lDG/Pa9hRkZA=="
565577
if (globalDsmEnabled) {
566578
extractedSpan != null
567579
def extracted = extractedSpan.context()
@@ -572,6 +584,9 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
572584
extractedSpan == null
573585
}
574586
587+
cleanup:
588+
AgentTracer.forceRegister(originalTracer)
589+
575590
where:
576591
globalDsmEnabled << [true, false]
577592
}
@@ -589,19 +604,27 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
589604
isDataStreamsEnabled() >> { return globalDsmEnabled }
590605
}
591606
592-
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
607+
def tracerApi = Mock(AgentTracer.TracerAPI) {
608+
captureTraceConfig() >> globalTraceConfig
609+
}
610+
AgentTracer.TracerAPI originalTracer = AgentTracer.get()
611+
AgentTracer.forceRegister(tracerApi)
612+
613+
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig },
614+
wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
593615
616+
DataStreamsTags.setGlobalBaseHash(baseHash)
594617
def context = new DefaultPathwayContext(timeSource, baseHash, null)
595618
timeSource.advance(MILLISECONDS.toNanos(50))
596-
context.setCheckpoint(fromTags(DataStreamsTags.create("internal", null)), pointConsumer)
619+
context.setCheckpoint(fromTags(DataStreamsTags.create("internal", DataStreamsTags.Direction.Inbound)), pointConsumer)
597620
def encoded = context.encode()
598621
Map<String, String> carrier = [(PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"]
599622
def contextVisitor = new Base64MapContextVisitor()
600-
def spanContext = new ExtractedContext(DDTraceId.ONE, 1, 0, null, 0, null, (TagMap)null, null, null, null, DATADOG)
623+
def spanContext = new ExtractedContext(DDTraceId.ONE, 1, 0, null, 0,
624+
null, (TagMap)null, null, null, globalTraceConfig, DATADOG)
601625
def baseContext = AgentSpan.fromSpanContext(spanContext).storeInto(root())
602626
def propagator = dataStreams.propagator()
603627
604-
605628
when:
606629
def extractedContext = propagator.extract(baseContext, carrier, contextVisitor)
607630
def extractedSpan = AgentSpan.fromContext(extractedContext)
@@ -614,13 +637,17 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
614637
615638
then:
616639
extracted != null
640+
encoded == "L+lDG/Pa9hRkZA=="
617641
if (globalDsmEnabled) {
618642
extracted.pathwayContext != null
619643
extracted.pathwayContext.isStarted()
620644
} else {
621645
extracted.pathwayContext == null
622646
}
623647
648+
cleanup:
649+
AgentTracer.forceRegister(originalTracer)
650+
624651
where:
625652
globalDsmEnabled << [true, false]
626653
}
@@ -661,4 +688,4 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
661688
}
662689
}
663690
}
664-
}
691+
}

dd-trace-core/src/traceAgentTest/groovy/DataStreamsIntegrationTest.groovy

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import datadog.communication.ddagent.SharedCommunicationObjects
33
import datadog.communication.http.OkHttpUtils
44
import datadog.trace.api.Config
55
import datadog.trace.api.TraceConfig
6+
import datadog.trace.api.datastreams.DataStreamsTags
67
import datadog.trace.api.time.ControllableTimeSource
78
import datadog.trace.api.datastreams.StatsPoint
89
import datadog.trace.common.metrics.EventListener
@@ -46,12 +47,8 @@ class DataStreamsIntegrationTest extends AbstractTraceAgentTest {
4647
when:
4748
def dataStreams = new DefaultDataStreamsMonitoring(sink, sharedCommunicationObjects.featuresDiscovery(Config.get()), timeSource, { traceConfig }, Config.get())
4849
dataStreams.start()
49-
def tags = DataStreamsTags
50-
.withTopic("testTopic")
51-
.withGroup("testGroup")
52-
.withType("testType")
53-
.build()
54-
dataStreams.add(new StatsPoint(tags, 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null))
50+
def tg = DataStreamsTags.create("testType", null, "testTopic", "testGroup", null)
51+
dataStreams.add(new StatsPoint(tg, 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null))
5552
timeSource.advance(Config.get().getDataStreamsBucketDurationNanoseconds())
5653
dataStreams.report()
5754

0 commit comments

Comments
 (0)