@@ -45,7 +45,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
4545 def " First Set checkpoint starts the context." () {
4646 given :
4747 def timeSource = new ControllableTimeSource ()
48- def context = new DefaultPathwayContext (timeSource, baseHash)
48+ def context = new DefaultPathwayContext (timeSource, baseHash, null )
4949
5050 when :
5151 timeSource. advance(50 )
@@ -60,7 +60,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
6060 def " Checkpoint generated" () {
6161 given :
6262 def timeSource = new ControllableTimeSource ()
63- def context = new DefaultPathwayContext (timeSource, baseHash)
63+ def context = new DefaultPathwayContext (timeSource, baseHash, null )
6464
6565 when :
6666 timeSource. advance(50 )
@@ -86,7 +86,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
8686 def " Checkpoint with payload size" () {
8787 given :
8888 def timeSource = new ControllableTimeSource ()
89- def context = new DefaultPathwayContext (timeSource, baseHash)
89+ def context = new DefaultPathwayContext (timeSource, baseHash, null )
9090
9191 when :
9292 timeSource. advance(25 )
@@ -107,7 +107,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
107107 def " Multiple checkpoints generated" () {
108108 given :
109109 def timeSource = new ControllableTimeSource ()
110- def context = new DefaultPathwayContext (timeSource, baseHash)
110+ def context = new DefaultPathwayContext (timeSource, baseHash, null )
111111
112112 when :
113113 timeSource. advance(50 )
@@ -147,7 +147,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
147147 def " Exception thrown when trying to encode unstarted context" () {
148148 given :
149149 def timeSource = new ControllableTimeSource ()
150- def context = new DefaultPathwayContext (timeSource, baseHash)
150+ def context = new DefaultPathwayContext (timeSource, baseHash, null )
151151
152152 when :
153153 context. encode()
@@ -159,14 +159,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
159159 def " Set checkpoint with dataset tags" () {
160160 given :
161161 def timeSource = new ControllableTimeSource ()
162- def context = new DefaultPathwayContext (timeSource, baseHash)
162+ def context = new DefaultPathwayContext (timeSource, baseHash, null )
163163
164164 when :
165165 timeSource. advance(MILLISECONDS . toNanos(50 ))
166166 context. setCheckpoint(new LinkedHashMap<> ([" type" : " s3" , " ds.namespace" : " my_bucket" , " ds.name" : " my_object.csv" , " direction" : " in" ]), pointConsumer)
167167 def encoded = context. strEncode()
168168 timeSource. advance(MILLISECONDS . toNanos(2 ))
169- def decodedContext = DefaultPathwayContext . strDecode(timeSource, baseHash, encoded)
169+ def decodedContext = DefaultPathwayContext . strDecode(timeSource, baseHash, null , encoded)
170170 timeSource. advance(MILLISECONDS . toNanos(25 ))
171171 context. setCheckpoint(new LinkedHashMap<> ([" type" : " s3" , " ds.namespace" : " my_bucket" , " ds.name" : " my_object.csv" , " direction" : " out" ]), pointConsumer)
172172
@@ -185,14 +185,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
185185 // Timesource needs to be advanced in milliseconds because encoding truncates to millis
186186 given:
187187 def timeSource = new ControllableTimeSource()
188- def context = new DefaultPathwayContext(timeSource, baseHash)
188+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
189189
190190 when:
191191 timeSource.advance(MILLISECONDS.toNanos(50))
192192 context.setCheckpoint(new LinkedHashMap<>([" type" : " internal" ]), pointConsumer)
193193 def encoded = context.strEncode()
194194 timeSource.advance(MILLISECONDS.toNanos(2))
195- def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded)
195+ def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded)
196196 timeSource.advance(MILLISECONDS.toNanos(25))
197197 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topic" , " type" : " kafka" ]), pointConsumer)
198198
@@ -213,7 +213,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
213213 def " Set checkpoint with timestamp" () {
214214 given:
215215 def timeSource = new ControllableTimeSource()
216- def context = new DefaultPathwayContext(timeSource, baseHash)
216+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
217217 def timeFromQueue = timeSource.getCurrentTimeMillis() - 200
218218 when:
219219 context.setCheckpoint([" type" : " internal" ], pointConsumer, timeFromQueue)
@@ -234,15 +234,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
234234 // Timesource needs to be advanced in milliseconds because encoding truncates to millis
235235 given:
236236 def timeSource = new ControllableTimeSource()
237- def context = new DefaultPathwayContext(timeSource, baseHash)
237+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
238238
239239 when:
240240 timeSource.advance(MILLISECONDS.toNanos(50))
241241 context.setCheckpoint(new LinkedHashMap<>([" type" : " internal" ]), pointConsumer)
242242
243243 def encoded = context.strEncode()
244244 timeSource.advance(MILLISECONDS.toNanos(1))
245- def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded)
245+ def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, null, encoded)
246246 timeSource.advance(MILLISECONDS.toNanos(25))
247247 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topic" , " type" : " kafka" ]), pointConsumer)
248248
@@ -261,7 +261,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
261261 when:
262262 def secondEncode = decodedContext.strEncode()
263263 timeSource.advance(MILLISECONDS.toNanos(2))
264- def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, secondEncode)
264+ def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, null, secondEncode)
265265 timeSource.advance(MILLISECONDS.toNanos(30))
266266 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topicB" , " type" : " kafka" ]), pointConsumer)
267267
@@ -282,7 +282,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
282282 // Timesource needs to be advanced in milliseconds because encoding truncates to millis
283283 given:
284284 def timeSource = new ControllableTimeSource()
285- def context = new DefaultPathwayContext(timeSource, baseHash)
285+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
286286 def contextVisitor = new Base64MapContextVisitor()
287287
288288 when:
@@ -292,7 +292,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
292292 def encoded = context.strEncode()
293293 Map<String, String> carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, " someotherkey" : " someothervalue" ]
294294 timeSource.advance(MILLISECONDS.toNanos(1))
295- def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
295+ def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null )
296296 timeSource.advance(MILLISECONDS.toNanos(25))
297297 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topic" , " type" : " kafka" ]), pointConsumer)
298298
@@ -312,7 +312,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
312312 def secondEncode = decodedContext.strEncode()
313313 carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode]
314314 timeSource.advance(MILLISECONDS.toNanos(2))
315- def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
315+ def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null )
316316 timeSource.advance(MILLISECONDS.toNanos(30))
317317 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topicB" , " type" : " kafka" ]), pointConsumer)
318318
@@ -333,14 +333,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
333333 // Timesource needs to be advanced in milliseconds because encoding truncates to millis
334334 given:
335335 def timeSource = new ControllableTimeSource()
336- def context = new DefaultPathwayContext(timeSource, baseHash)
336+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
337337
338338 when:
339339 timeSource.advance(MILLISECONDS.toNanos(50))
340340 context.setCheckpoint(new LinkedHashMap<>([" type" : " internal" ]), pointConsumer)
341341 def encoded = context.encode()
342342 timeSource.advance(MILLISECONDS.toNanos(2))
343- def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded)
343+ def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded)
344344 timeSource.advance(MILLISECONDS.toNanos(25))
345345 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topic" , " type" : " kafka" ]), pointConsumer)
346346
@@ -362,15 +362,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
362362 // Timesource needs to be advanced in milliseconds because encoding truncates to millis
363363 given:
364364 def timeSource = new ControllableTimeSource()
365- def context = new DefaultPathwayContext(timeSource, baseHash)
365+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
366366
367367 when:
368368 timeSource.advance(MILLISECONDS.toNanos(50))
369369 context.setCheckpoint(new LinkedHashMap<>([" type" : " internal" ]), pointConsumer)
370370
371371 def encoded = context.encode()
372372 timeSource.advance(MILLISECONDS.toNanos(1))
373- def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded)
373+ def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, null, encoded)
374374 timeSource.advance(MILLISECONDS.toNanos(25))
375375 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topic" , " type" : " kafka" ]), pointConsumer)
376376
@@ -389,7 +389,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
389389 when:
390390 def secondEncode = decodedContext.encode()
391391 timeSource.advance(MILLISECONDS.toNanos(2))
392- def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, secondEncode)
392+ def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, null, secondEncode)
393393 timeSource.advance(MILLISECONDS.toNanos(30))
394394 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topicB" , " type" : " kafka" ]), pointConsumer)
395395
@@ -409,7 +409,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
409409 def " Legacy binary encoding" () {
410410 given:
411411 def timeSource = new ControllableTimeSource()
412- def context = new DefaultPathwayContext(timeSource, baseHash)
412+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
413413 def contextVisitor = new BinaryMapContextVisitor()
414414
415415 when:
@@ -419,7 +419,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
419419 def encoded = java.util.Base64.getDecoder().decode(context.encode())
420420 Map<String, byte[]> carrier = [(PathwayContext.PROPAGATION_KEY): encoded]
421421 timeSource.advance(MILLISECONDS.toNanos(1))
422- def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash)
422+ def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null )
423423
424424 then:
425425 decodedContext.strEncode() == context.strEncode()
@@ -429,7 +429,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
429429 // Timesource needs to be advanced in milliseconds because encoding truncates to millis
430430 given:
431431 def timeSource = new ControllableTimeSource()
432- def context = new DefaultPathwayContext(timeSource, baseHash)
432+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
433433 def contextVisitor = new BinaryMapContextVisitor()
434434
435435 when:
@@ -439,7 +439,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
439439 def encoded = context.encode()
440440 Map<String, byte[]> carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, " someotherkey" : new byte[0]]
441441 timeSource.advance(MILLISECONDS.toNanos(1))
442- def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash)
442+ def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null )
443443 timeSource.advance(MILLISECONDS.toNanos(25))
444444 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topic" , " type" : " kafka" ]), pointConsumer)
445445
@@ -459,7 +459,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
459459 def secondEncode = decodedContext.encode()
460460 carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode]
461461 timeSource.advance(MILLISECONDS.toNanos(2))
462- def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash)
462+ def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash, null )
463463 timeSource.advance(MILLISECONDS.toNanos(30))
464464 context.setCheckpoint(new LinkedHashMap<>([" group" : " group" , " topic" : " topicB" , " type" : " kafka" ]), pointConsumer)
465465
@@ -480,7 +480,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
480480 // Timesource needs to be advanced in milliseconds because encoding truncates to millis
481481 given:
482482 def timeSource = new ControllableTimeSource()
483- def context = new DefaultPathwayContext(timeSource, baseHash)
483+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
484484 def contextVisitor = new Base64MapContextVisitor()
485485
486486 when:
@@ -490,7 +490,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
490490 def encoded = context.strEncode()
491491 Map<String, String> carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, " someotherkey" : " someothervalue" ]
492492 timeSource.advance(MILLISECONDS.toNanos(1))
493- def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
493+ def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null )
494494 timeSource.advance(MILLISECONDS.toNanos(25))
495495 context.setCheckpoint(new LinkedHashMap<>([" topic" : " topic" , " type" : " sqs" ]), pointConsumer)
496496
@@ -510,7 +510,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
510510 def secondEncode = decodedContext.strEncode()
511511 carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode]
512512 timeSource.advance(MILLISECONDS.toNanos(2))
513- def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
513+ def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash, null )
514514 timeSource.advance(MILLISECONDS.toNanos(30))
515515 context.setCheckpoint(new LinkedHashMap<>([" topic" : " topicB" , " type" : " sqs" ]), pointConsumer)
516516
@@ -530,7 +530,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
530530 def " Empty tags not set" () {
531531 given:
532532 def timeSource = new ControllableTimeSource()
533- def context = new DefaultPathwayContext(timeSource, baseHash)
533+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
534534
535535 when:
536536 timeSource.advance(50)
@@ -591,7 +591,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
591591
592592 def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
593593
594- def context = new DefaultPathwayContext(timeSource, baseHash)
594+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
595595 timeSource.advance(MILLISECONDS.toNanos(50))
596596 context.setCheckpoint(new LinkedHashMap<>([" type" : " internal" ]), pointConsumer)
597597 def encoded = context.strEncode()
@@ -637,7 +637,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
637637
638638 def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
639639
640- def context = new DefaultPathwayContext(timeSource, baseHash)
640+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
641641 timeSource.advance(MILLISECONDS.toNanos(50))
642642 context.setCheckpoint(new LinkedHashMap<>([" type" : " internal" ]), pointConsumer)
643643 def encoded = context.strEncode()
@@ -678,7 +678,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
678678
679679 def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
680680
681- def context = new DefaultPathwayContext(timeSource, baseHash)
681+ def context = new DefaultPathwayContext(timeSource, baseHash, null )
682682 timeSource.advance(MILLISECONDS.toNanos(50))
683683 context.setCheckpoint(new LinkedHashMap<>([" type" : " internal" ]), pointConsumer)
684684 def encoded = context.strEncode()
0 commit comments