@@ -71,20 +71,21 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
7171
7272 when :
7373 timeSource. advance(50 )
74- context. setCheckpoint(fromTags(DataStreamsTags . create(" internal" , null )), pointConsumer)
74+ context. setCheckpoint(fromTags(DataStreamsTags . create(" internal" , DataStreamsTags.Direction.Inbound )), pointConsumer)
7575 timeSource. advance(25 )
76- def tags = DataStreamsTags . create(" kafka" , null , " topic" , " group" , null )
76+ def tags = DataStreamsTags . create(" kafka" , DataStreamsTags.Direction.Outbound , " topic" , " group" , null )
7777 context. setCheckpoint(fromTags(tags), pointConsumer)
7878
7979 then :
8080 context. isStarted()
8181 pointConsumer. points. size() == 2
8282 verifyFirstPoint(pointConsumer. points[0 ])
8383 with(pointConsumer. points[1 ]) {
84- tags. group == " group"
85- tags. topic == " topic"
86- tags. type == " kafka"
87- tags. size() == 3
84+ tags. group == " group:group"
85+ tags. topic == " topic:topic"
86+ tags. type == " type:kafka"
87+ tags. getDirection() == " direction:out"
88+ tags. nonNullSize() == 4
8889 parentHash == pointConsumer. points[0 ]. hash
8990 hash != 0
9091 pathwayLatencyNano == 25
@@ -100,15 +101,17 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
100101 when :
101102 timeSource. advance(25 )
102103 context. setCheckpoint(
103- create(new LinkedHashMap<> ([ " group " : " group " , " topic" : " topic " , " type " : " kafka " ] ), 0 , 72 ),
104+ create(DataStreamsTags . create( " kafka " , null , " topic" , " group " , null ), 0 , 72 ),
104105 pointConsumer)
105106
106107 then :
107108 context. isStarted()
108109 pointConsumer. points. size() == 1
109110 with(pointConsumer. points[0 ]) {
110- edgeTags == [" group:group" , " topic:topic" , " type:kafka" ]
111- edgeTags. size() == 3
111+ tags. getGroup() == " group:group"
112+ tags. getTopic() == " topic:topic"
113+ tags. getType() == " type:kafka"
114+ tags. nonNullSize() == 3
112115 hash != 0
113116 payloadSizeBytes == 72
114117 }
@@ -121,29 +124,34 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
121124
122125 when :
123126 timeSource. advance(50 )
124- context. setCheckpoint(fromTags(new LinkedHashMap<> ([ " direction " : " out " , " type " : " kafka " ] )), pointConsumer)
127+ context. setCheckpoint(fromTags(DataStreamsTags . create( " kafka " , DataStreamsTags.Direction.Outbound )), pointConsumer)
125128 timeSource. advance(25 )
126- context . setCheckpoint(
127- fromTags(new LinkedHashMap<> ([ " direction " : " in " , " group " : " group " , " topic " : " topic " , " type " : " kafka " ]) ), pointConsumer)
129+ def tg = DataStreamsTags . create( " kafka " , DataStreamsTags.Direction.Inbound , " topic " , " group " , null )
130+ context . setCheckpoint( fromTags(tg ), pointConsumer)
128131 timeSource. advance(30 )
129- context. setCheckpoint(
130- fromTags(new LinkedHashMap<> ([" direction" : " in" , " group" : " group" , " topic" : " topic" , " type" : " kafka" ])), pointConsumer)
132+ context. setCheckpoint(fromTags(tg), pointConsumer)
131133
132134 then :
133135 context. isStarted()
134136 pointConsumer. points. size() == 3
135137 verifyFirstPoint(pointConsumer. points[0 ])
136138 with(pointConsumer. points[1 ]) {
137- edgeTags == [" direction:in" , " group:group" , " topic:topic" , " type:kafka" ]
138- edgeTags. size() == 4
139+ tags. nonNullSize() == 4
140+ tags. direction == " direction:in"
141+ tags. group == " group:group"
142+ tags. topic == " topic:topic"
143+ tags. type == " type:kafka"
139144 parentHash == pointConsumer. points[0 ]. hash
140145 hash != 0
141146 pathwayLatencyNano == 25
142147 edgeLatencyNano == 25
143148 }
144149 with(pointConsumer. points[2 ]) {
145- edgeTags == [" direction:in" , " group:group" , " topic:topic" , " type:kafka" ]
146- edgeTags. size() == 4
150+ tags. nonNullSize() == 4
151+ tags. direction == " direction:in"
152+ tags. group == " group:group"
153+ tags. topic == " topic:topic"
154+ tags. type == " type:kafka"
147155 // this point should have the first point as parent,
148156 // as the loop protection will reset the parent if two identical
149157 // points (same hash for tag values) are about to form a hierarchy
@@ -173,19 +181,20 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
173181
174182 when :
175183 timeSource. advance(MILLISECONDS . toNanos(50 ))
176- context. setCheckpoint(fromTags(new LinkedHashMap<> ([ " type " : " s3" , " ds.namespace " : " my_bucket " , " ds.name " : " my_object.csv" , " direction " : " in " ] )), pointConsumer)
184+ context. setCheckpoint(fromTags(DataStreamsTags . createWithDataset( " s3" , DataStreamsTags.Direction.Inbound , null , " my_object.csv" , " my_bucket " )), pointConsumer)
177185 def encoded = context. encode()
178186 timeSource. advance(MILLISECONDS . toNanos(2 ))
179187 def decodedContext = DefaultPathwayContext . decode(timeSource, baseHash, null , encoded)
180188 timeSource. advance(MILLISECONDS . toNanos(25 ))
181- context. setCheckpoint(fromTags(new LinkedHashMap<> ([" type" : " s3" , " ds.namespace" : " my_bucket" , " ds.name" : " my_object.csv" , " direction" : " out" ])), pointConsumer)
189+ def tg = DataStreamsTags . createWithDataset(" s3" , DataStreamsTags.Direction.Outbound , null , " my_object.csv" , " my_bucket" )
190+ context. setCheckpoint(fromTags(tg), pointConsumer)
182191
183192 then :
184193 decodedContext. isStarted()
185194 pointConsumer. points. size() == 2
186195
187196 // all points should have datasetHash, which is not equal to hash or 0
188- for (var i = 0 ; i < pointConsumer. points. size(); i++ ){
197+ for (def i = 0 ; i < pointConsumer. points. size(); i++ ){
189198 pointConsumer. points[i]. aggregationHash != pointConsumer. points[i]. hash
190199 pointConsumer. points[i]. aggregationHash != 0
191200 }
@@ -199,20 +208,22 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
199208
200209 when:
201210 timeSource.advance(MILLISECONDS.toNanos(50))
202- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal" ] )), pointConsumer)
211+ context.setCheckpoint(fromTags(DataStreamsTags.create( " internal" , DataStreamsTags.Direction.Inbound )), pointConsumer)
203212 def encoded = context.encode()
204213 timeSource.advance(MILLISECONDS.toNanos(2))
205214 def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded)
206215 timeSource.advance(MILLISECONDS.toNanos(25))
207- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " group " : " group " , " topic" : " topic " , " type " : " kafka " ] )), pointConsumer)
216+ context.setCheckpoint(fromTags(DataStreamsTags.create( " kafka " , null , " topic" , " group " , null )), pointConsumer)
208217
209218 then:
210219 decodedContext.isStarted()
211220 pointConsumer.points.size() == 2
212221
213222 with(pointConsumer.points[1]) {
214- edgeTags == [" group :group" , " topic :topic" , " type :kafka" ]
215- edgeTags.size() == 3
223+ tags.nonNullSize() == 3
224+ tags.getGroup() == " group :group"
225+ tags.getType() == " type :kafka"
226+ tags.getTopic() == " topic :topic"
216227 parentHash == pointConsumer.points[0].hash
217228 hash != 0
218229 pathwayLatencyNano == MILLISECONDS.toNanos(27)
@@ -226,13 +237,13 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
226237 def context = new DefaultPathwayContext(timeSource, baseHash, null)
227238 def timeFromQueue = timeSource.getCurrentTimeMillis() - 200
228239 when:
229- context.setCheckpoint(create([ " type " : " internal" ] , timeFromQueue, 0), pointConsumer)
240+ context.setCheckpoint(create(DataStreamsTags.create( " internal" , null) , timeFromQueue, 0), pointConsumer)
230241 then:
231242 context.isStarted()
232243 pointConsumer.points.size() == 1
233244 with(pointConsumer.points[0]) {
234- edgeTags == [ " type :internal" ]
235- edgeTags.size () == 1
245+ tags.getType() == " type :internal"
246+ tags.nonNullSize () == 1
236247 parentHash == 0
237248 hash != 0
238249 pathwayLatencyNano == MILLISECONDS.toNanos(200)
@@ -248,20 +259,23 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
248259
249260 when:
250261 timeSource.advance(MILLISECONDS.toNanos(50))
251- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal" ] )), pointConsumer)
262+ context.setCheckpoint(fromTags(DataStreamsTags.create( " internal" , DataStreamsTags.Direction.Inbound )), pointConsumer)
252263
253264 def encoded = context.encode()
254265 timeSource.advance(MILLISECONDS.toNanos(1))
255266 def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded)
256267 timeSource.advance(MILLISECONDS.toNanos(25))
257- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " group " : " group " , " topic" : " topic " , " type " : " kafka " ] )), pointConsumer)
268+ context.setCheckpoint(fromTags(DataStreamsTags.create( " kafka " , DataStreamsTags.Direction.Outbound , " topic" , " group " , null )), pointConsumer)
258269
259270 then:
260271 decodedContext.isStarted()
261272 pointConsumer.points.size() == 2
262273 with(pointConsumer.points[1]) {
263- edgeTags == [" group :group" , " topic :topic" , " type :kafka" ]
264- edgeTags.size() == 3
274+ tags.group == " group :group"
275+ tags.topic == " topic :topic"
276+ tags.type == " type :kafka"
277+ tags.direction == " direction :out"
278+ tags.nonNullSize() == 4
265279 parentHash == pointConsumer.points[0].hash
266280 hash != 0
267281 pathwayLatencyNano == MILLISECONDS.toNanos(26)
@@ -273,14 +287,17 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
273287 timeSource.advance(MILLISECONDS.toNanos(2))
274288 def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, null, secondEncode)
275289 timeSource.advance(MILLISECONDS.toNanos(30))
276- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " group " : " group " , " topic " : " topicB" , " type " : " kafka " ] )), pointConsumer)
290+ context.setCheckpoint(fromTags(DataStreamsTags.create( " kafka " , DataStreamsTags.Direction.Inbound , " topicB" , " group " , null )), pointConsumer)
277291
278292 then:
279293 secondDecode.isStarted()
280294 pointConsumer.points.size() == 3
281295 with(pointConsumer.points[2]) {
282- edgeTags == [" group :group" , " topic :topicB" , " type :kafka" ]
283- edgeTags.size() == 3
296+ tags.group == " group :group"
297+ tags.topic == " topic :topicB"
298+ tags.type == " type :kafka"
299+ tags.direction == " direction :in "
300+ tags.nonNullSize() == 4
284301 parentHash == pointConsumer.points[1].hash
285302 hash != 0
286303 pathwayLatencyNano == MILLISECONDS.toNanos(58)
@@ -297,21 +314,24 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
297314
298315 when:
299316 timeSource.advance(MILLISECONDS.toNanos(50))
300- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal" ] )), pointConsumer)
317+ context.setCheckpoint(fromTags(DataStreamsTags.create( " internal" , DataStreamsTags.Direction.Inbound )), pointConsumer)
301318
302319 def encoded = context.encode()
303320 Map<String, String> carrier = [(PROPAGATION_KEY_BASE64): encoded, " someotherkey" : " someothervalue" ]
304321 timeSource.advance(MILLISECONDS.toNanos(1))
305322 def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null)
306323 timeSource.advance(MILLISECONDS.toNanos(25))
307- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " group " : " group " , " topic" : " topic " , " type " : " kafka " ] )), pointConsumer)
324+ context.setCheckpoint(fromTags(DataStreamsTags.create( " kafka " , DataStreamsTags.Direction.Outbound , " topic" , " group " , null )), pointConsumer)
308325
309326 then:
310327 decodedContext.isStarted()
311328 pointConsumer.points.size() == 2
312329 with(pointConsumer.points[1]) {
313- edgeTags == [" group :group" , " topic :topic" , " type :kafka" ]
314- edgeTags.size() == 3
330+ tags.nonNullSize() == 4
331+ tags.group == " group :group"
332+ tags.topic == " topic :topic"
333+ tags.type == " type :kafka"
334+ tags.direction == " direction :out"
315335 parentHash == pointConsumer.points[0].hash
316336 hash != 0
317337 pathwayLatencyNano == MILLISECONDS.toNanos(26)
@@ -324,14 +344,17 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
324344 timeSource.advance(MILLISECONDS.toNanos(2))
325345 def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null)
326346 timeSource.advance(MILLISECONDS.toNanos(30))
327- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " group " : " group " , " topic " : " topicB" , " type " : " kafka " ] )), pointConsumer)
347+ context.setCheckpoint(fromTags(DataStreamsTags.create( " kafka " , DataStreamsTags.Direction.Inbound , " topicB" , " group " , null )), pointConsumer)
328348
329349 then:
330350 secondDecode.isStarted()
331351 pointConsumer.points.size() == 3
332352 with(pointConsumer.points[2]) {
333- edgeTags == [" group :group" , " topic :topicB" , " type :kafka" ]
334- edgeTags.size() == 3
353+ tags.nonNullSize() == 4
354+ tags.group == " group :group"
355+ tags.topic == " topic :topicB"
356+ tags.type == " type :kafka"
357+ tags.direction == " direction :in "
335358 parentHash == pointConsumer.points[1].hash
336359 hash != 0
337360 pathwayLatencyNano == MILLISECONDS.toNanos(58)
@@ -348,21 +371,23 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
348371
349372 when:
350373 timeSource.advance(MILLISECONDS.toNanos(50))
351- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal" ] )), pointConsumer)
374+ context.setCheckpoint(fromTags(DataStreamsTags.create( " internal" , DataStreamsTags.Direction.Inbound )), pointConsumer)
352375
353376 def encoded = context.encode()
354377 Map<String, String> carrier = [(PROPAGATION_KEY_BASE64): encoded, " someotherkey" : " someothervalue" ]
355378 timeSource.advance(MILLISECONDS.toNanos(1))
356379 def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null)
357380 timeSource.advance(MILLISECONDS.toNanos(25))
358- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " topic " : " topic" , " type " : " sqs " ] )), pointConsumer)
381+ context.setCheckpoint(fromTags(DataStreamsTags.create( " sqs " , DataStreamsTags.Direction.Outbound, " topic" , null, null )), pointConsumer)
359382
360383 then:
361384 decodedContext.isStarted()
362385 pointConsumer.points.size() == 2
363386 with(pointConsumer.points[1]) {
364- edgeTags == [" topic :topic" , " type :sqs" ]
365- edgeTags.size() == 2
387+ tags.direction == " direction :out"
388+ tags.topic == " topic :topic"
389+ tags.type == " type :sqs"
390+ tags.nonNullSize() == 3
366391 parentHash == pointConsumer.points[0].hash
367392 hash != 0
368393 pathwayLatencyNano == MILLISECONDS.toNanos(26)
@@ -375,14 +400,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
375400 timeSource.advance(MILLISECONDS.toNanos(2))
376401 def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null)
377402 timeSource.advance(MILLISECONDS.toNanos(30))
378- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " topic " : " topicB" , " type " : " sqs " ] )), pointConsumer)
403+ context.setCheckpoint(fromTags(DataStreamsTags.create( " sqs " , DataStreamsTags.Direction.Inbound, " topicB" , null, null )), pointConsumer)
379404
380405 then:
381406 secondDecode.isStarted()
382407 pointConsumer.points.size() == 3
383408 with(pointConsumer.points[2]) {
384- edgeTags == [" topic :topicB" , " type :sqs" ]
385- edgeTags.size() == 2
409+ tags.type == " type :sqs"
410+ tags.topic == " topic :topicB"
411+ tags.nonNullSize() == 3
386412 parentHash == pointConsumer.points[1].hash
387413 hash != 0
388414 pathwayLatencyNano == MILLISECONDS.toNanos(58)
@@ -397,26 +423,29 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
397423
398424 when:
399425 timeSource.advance(50)
400- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal" ] )), pointConsumer)
426+ context.setCheckpoint(fromTags(DataStreamsTags.create( " internal" , DataStreamsTags.Direction.Inbound )), pointConsumer)
401427 timeSource.advance(25)
402- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " group " : " group " , " topic" : " topic " , " type " : " type " ] )), pointConsumer)
428+ context.setCheckpoint(fromTags(DataStreamsTags.create( " type " , DataStreamsTags.Direction.Outbound , " topic" , " group " , null )), pointConsumer)
403429 timeSource.advance(25)
404- context.setCheckpoint(fromTags(new LinkedHashMap<>( )), pointConsumer)
430+ context.setCheckpoint(fromTags(DataStreamsTags.create(null, null )), pointConsumer)
405431
406432 then:
407433 context.isStarted()
408434 pointConsumer.points.size() == 3
409435 verifyFirstPoint(pointConsumer.points[0])
410436 with(pointConsumer.points[1]) {
411- edgeTags == [" group :group" , " topic :topic" , " type :type" ]
412- edgeTags.size() == 3
437+ tags.type == " type :type"
438+ tags.topic == " topic :topic"
439+ tags.group == " group :group"
440+ tags.direction == " direction :out"
441+ tags.nonNullSize() == 4
413442 parentHash == pointConsumer.points[0].hash
414443 hash != 0
415444 pathwayLatencyNano == 25
416445 edgeLatencyNano == 25
417446 }
418447 with(pointConsumer.points[2]) {
419- edgeTags.size () == 0
448+ tags.nonNullSize () == 0
420449 parentHash == pointConsumer.points[1].hash
421450 hash != 0
422451 pathwayLatencyNano == 50
@@ -475,7 +504,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
475504
476505 def context = new DefaultPathwayContext(timeSource, baseHash, null)
477506 timeSource.advance(MILLISECONDS.toNanos(50))
478- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal " ] )), pointConsumer)
507+ context.setCheckpoint(fromTags(DataStreamsTags.create( " itnernal " , DataStreamsTags.Direction.Inbound )), pointConsumer)
479508 def encoded = context.encode()
480509 Map<String, String> carrier = [
481510 (PROPAGATION_KEY_BASE64): encoded,
@@ -521,7 +550,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
521550
522551 def context = new DefaultPathwayContext(timeSource, baseHash, null)
523552 timeSource.advance(MILLISECONDS.toNanos(50))
524- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal" ] )), pointConsumer)
553+ context.setCheckpoint(fromTags(DataStreamsTags.create( " internal" , DataStreamsTags.Direction.Inbound )), pointConsumer)
525554 def encoded = context.encode()
526555
527556 Map<String, String> carrier = [(PROPAGATION_KEY_BASE64): encoded, " someotherkey" : " someothervalue" ]
@@ -564,7 +593,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
564593
565594 def context = new DefaultPathwayContext(timeSource, baseHash, null)
566595 timeSource.advance(MILLISECONDS.toNanos(50))
567- context.setCheckpoint(fromTags(new LinkedHashMap<>([ " type " : " internal" ] )), pointConsumer)
596+ context.setCheckpoint(fromTags(DataStreamsTags.create( " internal" , null )), pointConsumer)
568597 def encoded = context.encode()
569598 Map<String, String> carrier = [(PROPAGATION_KEY_BASE64): encoded, " someotherkey" : " someothervalue" ]
570599 def contextVisitor = new Base64MapContextVisitor()
@@ -609,7 +638,8 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
609638 isDataStreamsEnabled() >> true
610639 }
611640
612- def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
641+ def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig },
642+ wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
613643
614644 Map<String, String> carrier = [" someotherkey" : " someothervalue" ]
615645 def contextVisitor = new Base64MapContextVisitor()
0 commit comments