@@ -84,7 +84,7 @@ class DataStreamsWritingTest extends DDCoreSpecification {
8484 dataStreams. start()
8585 dataStreams. setThreadServiceName(serviceNameOverride)
8686 dataStreams. add(new StatsPoint (DataStreamsTags . create(null , null ), 9 , 0 , 10 , timeSource. currentTimeNanos, 0 , 0 , 0 , serviceNameOverride))
87- dataStreams. trackBacklog(DataStreamsTags . createWithPartition(" kafka_produce" , " testTopic" , " 1" ), 130 )
87+ dataStreams. trackBacklog(DataStreamsTags . createWithPartition(" kafka_produce" , " testTopic" , " 1" , null , null ), 130 )
8888 timeSource. advance(DEFAULT_BUCKET_DURATION_NANOS )
8989 // force flush
9090 dataStreams. report()
@@ -106,11 +106,6 @@ class DataStreamsWritingTest extends DDCoreSpecification {
106106 assert unpacker. unpackString() == serviceNameOverride
107107 }
108108
109- def getTags (String type , String topic , String partition , String group ) {
110- return DataStreamsTags . createWithPartition(type, topic, partition, null , group)
111- }
112-
113-
114109 def " Write bucket to mock server with process tags enabled #processTagsEnabled" () {
115110 setup :
116111 injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED , " $processTagsEnabled " )
@@ -147,14 +142,14 @@ class DataStreamsWritingTest extends DDCoreSpecification {
147142 def dataStreams = new DefaultDataStreamsMonitoring (fakeConfig, sharedCommObjects, timeSource, { traceConfig })
148143 dataStreams. start()
149144 dataStreams. add(new StatsPoint (DataStreamsTags . create(null , null ), 9 , 0 , 10 , timeSource. currentTimeNanos, 0 , 0 , 0 , null ))
150- dataStreams. add(new StatsPoint (getTags (" testType" , " testTopic" , null , null ), 1 , 2 , 5 , timeSource. currentTimeNanos, 0 , 0 , 0 , null ))
151- dataStreams. trackBacklog(getTags (" kafka_produce" , " testTopic" , " 1" , null ), 100 )
152- dataStreams. trackBacklog(getTags (" kafka_produce" , " testTopic" , " 1" , null ), 130 )
145+ dataStreams. add(new StatsPoint (DataStreamsTags . create (" testType" , DataStreamsTags.Direction.Inbound , " testTopic" , " testGroup " , null ), 1 , 2 , 5 , timeSource. currentTimeNanos, 0 , 0 , 0 , null ))
146+ dataStreams. trackBacklog(DataStreamsTags . createWithPartition (" kafka_produce" , " testTopic" , " 1" , null , null ), 100 )
147+ dataStreams. trackBacklog(DataStreamsTags . createWithPartition (" kafka_produce" , " testTopic" , " 1" , null , null ), 130 )
153148 timeSource. advance(DEFAULT_BUCKET_DURATION_NANOS - 100l )
154- dataStreams. add(new StatsPoint (getTags (" testType" , " testTopic" , null , " testGroup" ), 1 , 2 , 5 , timeSource. currentTimeNanos, SECONDS . toNanos(10 ), SECONDS . toNanos(10 ), 10 , null ))
149+ dataStreams. add(new StatsPoint (DataStreamsTags . create (" testType" , DataStreamsTags.Direction.Inbound , " testTopic" , " testGroup" , null ), 1 , 2 , 5 , timeSource. currentTimeNanos, SECONDS . toNanos(10 ), SECONDS . toNanos(10 ), 10 , null ))
155150 timeSource. advance(DEFAULT_BUCKET_DURATION_NANOS )
156- dataStreams. add(new StatsPoint (getTags (" testType" , " testTopic" , null , " testGroup" ), 1 , 2 , 5 , timeSource. currentTimeNanos, SECONDS . toNanos(5 ), SECONDS . toNanos(5 ), 5 , null ))
157- dataStreams. add(new StatsPoint (getTags (" testType" , " testTopic2" , null , " testGroup" ), 3 , 4 , 6 , timeSource. currentTimeNanos, SECONDS . toNanos(2 ), 0 , 2 , null ))
151+ dataStreams. add(new StatsPoint (DataStreamsTags . create (" testType" , DataStreamsTags.Direction.Inbound , " testTopic" , " testGroup" , null ), 1 , 2 , 5 , timeSource. currentTimeNanos, SECONDS . toNanos(5 ), SECONDS . toNanos(5 ), 5 , null ))
152+ dataStreams. add(new StatsPoint (DataStreamsTags . create (" testType" , DataStreamsTags.Direction.Inbound , " testTopic2" , " testGroup" , null ), 3 , 4 , 6 , timeSource. currentTimeNanos, SECONDS . toNanos(2 ), 0 , 2 , null ))
158153 timeSource. advance(DEFAULT_BUCKET_DURATION_NANOS )
159154 dataStreams. close()
160155
@@ -231,7 +226,8 @@ class DataStreamsWritingTest extends DDCoreSpecification {
231226 assert unpacker. unpackString() == " ParentHash"
232227 assert unpacker. unpackLong() == 2
233228 assert unpacker. unpackString() == " EdgeTags"
234- assert unpacker. unpackArrayHeader() == 3
229+ assert unpacker. unpackArrayHeader() == 4
230+ assert unpacker. unpackString() == " direction:in"
235231 assert unpacker. unpackString() == " topic:testTopic"
236232 assert unpacker. unpackString() == " type:testType"
237233 assert unpacker. unpackString() == " group:testGroup"
@@ -274,7 +270,8 @@ class DataStreamsWritingTest extends DDCoreSpecification {
274270 assert unpacker. unpackString() == " ParentHash"
275271 assert unpacker. unpackLong() == (hash == 1 ? 2 : 4 )
276272 assert unpacker. unpackString() == " EdgeTags"
277- assert unpacker. unpackArrayHeader() == 3
273+ assert unpacker. unpackArrayHeader() == 4
274+ assert unpacker. unpackString() == " direction:in"
278275 assert unpacker. unpackString() == (hash == 1 ? " topic:testTopic" : " topic:testTopic2" )
279276 assert unpacker. unpackString() == " type:testType"
280277 assert unpacker. unpackString() == " group:testGroup"
0 commit comments