@@ -65,10 +65,11 @@ func TestProcessorBasic(t *testing.T) {
65
65
r1Stream .ctx ,
66
66
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
67
67
hlc.Timestamp {WallTime : 1 },
68
- nil , /* catchUpIter */
69
- false , /* withDiff */
70
- false , /* withFiltering */
71
- false /* withOmitRemote */ , false , /* withBulkDelivery */
68
+ nil , /* catchUpIter */
69
+ false , /* withDiff */
70
+ false , /* withFiltering */
71
+ false , /* withOmitRemote */
72
+ noBulkDelivery ,
72
73
h .toBufferedStreamIfNeeded (r1Stream ),
73
74
)
74
75
require .True (t , r1OK )
@@ -199,10 +200,11 @@ func TestProcessorBasic(t *testing.T) {
199
200
r2Stream .ctx ,
200
201
roachpb.RSpan {Key : roachpb .RKey ("c" ), EndKey : roachpb .RKey ("z" )},
201
202
hlc.Timestamp {WallTime : 1 },
202
- nil , /* catchUpIter */
203
- true , /* withDiff */
204
- true , /* withFiltering */
205
- false /* withOmitRemote */ , false , /* withBulkDelivery */
203
+ nil , /* catchUpIter */
204
+ true , /* withDiff */
205
+ true , /* withFiltering */
206
+ false , /* withOmitRemote */
207
+ noBulkDelivery ,
206
208
h .toBufferedStreamIfNeeded (r2Stream ),
207
209
)
208
210
require .True (t , r2OK )
@@ -311,10 +313,11 @@ func TestProcessorBasic(t *testing.T) {
311
313
r3Stream .ctx ,
312
314
roachpb.RSpan {Key : roachpb .RKey ("c" ), EndKey : roachpb .RKey ("z" )},
313
315
hlc.Timestamp {WallTime : 1 },
314
- nil , /* catchUpIter */
315
- false , /* withDiff */
316
- false , /* withFiltering */
317
- false /* withOmitRemote */ , false , /* withBulkDelivery */
316
+ nil , /* catchUpIter */
317
+ false , /* withDiff */
318
+ false , /* withFiltering */
319
+ false , /* withOmitRemote */
320
+ noBulkDelivery ,
318
321
h .toBufferedStreamIfNeeded (r3Stream ),
319
322
)
320
323
require .True (t , r30K )
@@ -332,10 +335,11 @@ func TestProcessorBasic(t *testing.T) {
332
335
r4Stream .ctx ,
333
336
roachpb.RSpan {Key : roachpb .RKey ("c" ), EndKey : roachpb .RKey ("z" )},
334
337
hlc.Timestamp {WallTime : 1 },
335
- nil , /* catchUpIter */
336
- false , /* withDiff */
337
- false , /* withFiltering */
338
- false /* withOmitRemote */ , false , /* withBulkDelivery */
338
+ nil , /* catchUpIter */
339
+ false , /* withDiff */
340
+ false , /* withFiltering */
341
+ false , /* withOmitRemote */
342
+ noBulkDelivery ,
339
343
h .toBufferedStreamIfNeeded (r4Stream ),
340
344
)
341
345
require .False (t , r4OK )
@@ -357,10 +361,11 @@ func TestProcessorOmitRemote(t *testing.T) {
357
361
r1Stream .ctx ,
358
362
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
359
363
hlc.Timestamp {WallTime : 1 },
360
- nil , /* catchUpIter */
361
- false , /* withDiff */
362
- false , /* withFiltering */
363
- false /* withOmitRemote */ , false , /* withBulkDelivery */
364
+ nil , /* catchUpIter */
365
+ false , /* withDiff */
366
+ false , /* withFiltering */
367
+ false , /* withOmitRemote */
368
+ noBulkDelivery ,
364
369
h .toBufferedStreamIfNeeded (r1Stream ),
365
370
)
366
371
require .True (t , r1OK )
@@ -386,7 +391,7 @@ func TestProcessorOmitRemote(t *testing.T) {
386
391
false , /* withDiff */
387
392
false , /* withFiltering */
388
393
true , /* withOmitRemote */
389
- false , /* withBulkDelivery */
394
+ noBulkDelivery ,
390
395
h .toBufferedStreamIfNeeded (r2Stream ),
391
396
)
392
397
require .True (t , r2OK )
@@ -440,21 +445,23 @@ func TestProcessorSlowConsumer(t *testing.T) {
440
445
r1Stream .ctx ,
441
446
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
442
447
hlc.Timestamp {WallTime : 1 },
443
- nil , /* catchUpIter */
444
- false , /* withDiff */
445
- false , /* withFiltering */
446
- false /* withOmitRemote */ , false , /* withBulkDelivery */
448
+ nil , /* catchUpIter */
449
+ false , /* withDiff */
450
+ false , /* withFiltering */
451
+ false , /* withOmitRemote */
452
+ noBulkDelivery ,
447
453
h .toBufferedStreamIfNeeded (r1Stream ),
448
454
)
449
455
r2Stream := newTestStream ()
450
456
p .Register (
451
457
r2Stream .ctx ,
452
458
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("z" )},
453
459
hlc.Timestamp {WallTime : 1 },
454
- nil , /* catchUpIter */
455
- false , /* withDiff */
456
- false , /* withFiltering */
457
- false /* withOmitRemote */ , false , /* withBulkDelivery */
460
+ nil , /* catchUpIter */
461
+ false , /* withDiff */
462
+ false , /* withFiltering */
463
+ false , /* withOmitRemote */
464
+ noBulkDelivery ,
458
465
h .toBufferedStreamIfNeeded (r2Stream ),
459
466
)
460
467
h .syncEventAndRegistrations ()
@@ -546,10 +553,11 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
546
553
r1Stream .ctx ,
547
554
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
548
555
hlc.Timestamp {WallTime : 1 },
549
- nil , /* catchUpIter */
550
- false , /* withDiff */
551
- false , /* withFiltering */
552
- false /* withOmitRemote */ , false , /* withBulkDelivery */
556
+ nil , /* catchUpIter */
557
+ false , /* withDiff */
558
+ false , /* withFiltering */
559
+ false , /* withOmitRemote */
560
+ noBulkDelivery ,
553
561
h .toBufferedStreamIfNeeded (r1Stream ),
554
562
)
555
563
h .syncEventAndRegistrations ()
@@ -601,10 +609,11 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) {
601
609
r1Stream .ctx ,
602
610
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
603
611
hlc.Timestamp {WallTime : 1 },
604
- nil , /* catchUpIter */
605
- false , /* withDiff */
606
- false , /* withFiltering */
607
- false /* withOmitRemote */ , false , /* withBulkDelivery */
612
+ nil , /* catchUpIter */
613
+ false , /* withDiff */
614
+ false , /* withFiltering */
615
+ false , /* withOmitRemote */
616
+ noBulkDelivery ,
608
617
h .toBufferedStreamIfNeeded (r1Stream ),
609
618
)
610
619
h .syncEventAndRegistrations ()
@@ -682,10 +691,11 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
682
691
r1Stream .ctx ,
683
692
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
684
693
hlc.Timestamp {WallTime : 1 },
685
- nil , /* catchUpIter */
686
- false , /* withDiff */
687
- false , /* withFiltering */
688
- false /* withOmitRemote */ , false , /* withBulkDelivery */
694
+ nil , /* catchUpIter */
695
+ false , /* withDiff */
696
+ false , /* withFiltering */
697
+ false , /* withOmitRemote */
698
+ noBulkDelivery ,
689
699
h .toBufferedStreamIfNeeded (r1Stream ),
690
700
)
691
701
h .syncEventAndRegistrations ()
@@ -993,7 +1003,8 @@ func TestProcessorConcurrentStop(t *testing.T) {
993
1003
runtime .Gosched ()
994
1004
s := newTestStream ()
995
1005
p .Register (s .ctx , h .span , hlc.Timestamp {}, nil , /* catchUpIter */
996
- false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , false , /* withBulkDelivery */
1006
+ false /* withDiff */ , false /* withFiltering */ , false , /* withOmitRemote */
1007
+ noBulkDelivery ,
997
1008
h .toBufferedStreamIfNeeded (s ))
998
1009
}()
999
1010
go func () {
@@ -1066,7 +1077,8 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
1066
1077
s := newTestStream ()
1067
1078
regs [s ] = firstIdx
1068
1079
p .Register (s .ctx , h .span , hlc.Timestamp {}, nil , /* catchUpIter */
1069
- false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , false , /* withBulkDelivery */
1080
+ false /* withDiff */ , false /* withFiltering */ , false , /* withOmitRemote */
1081
+ noBulkDelivery ,
1070
1082
h .toBufferedStreamIfNeeded (s ))
1071
1083
regDone <- struct {}{}
1072
1084
}
@@ -1123,10 +1135,11 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
1123
1135
rStream .ctx ,
1124
1136
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
1125
1137
hlc.Timestamp {WallTime : 1 },
1126
- nil , /* catchUpIter */
1127
- false , /* withDiff */
1128
- false , /* withFiltering */
1129
- false /* withOmitRemote */ , false , /* withBulkDelivery */
1138
+ nil , /* catchUpIter */
1139
+ false , /* withDiff */
1140
+ false , /* withFiltering */
1141
+ false , /* withOmitRemote */
1142
+ noBulkDelivery ,
1130
1143
h .toBufferedStreamIfNeeded (rStream ),
1131
1144
)
1132
1145
h .syncEventAndRegistrations ()
@@ -1203,10 +1216,11 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
1203
1216
rStream .ctx ,
1204
1217
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
1205
1218
hlc.Timestamp {WallTime : 1 },
1206
- nil , /* catchUpIter */
1207
- false , /* withDiff */
1208
- false , /* withFiltering */
1209
- false /* withOmitRemote */ , false , /* withBulkDelivery */
1219
+ nil , /* catchUpIter */
1220
+ false , /* withDiff */
1221
+ false , /* withFiltering */
1222
+ false , /* withOmitRemote */
1223
+ noBulkDelivery ,
1210
1224
h .toBufferedStreamIfNeeded (rStream ),
1211
1225
)
1212
1226
h .syncEventAndRegistrations ()
@@ -1273,10 +1287,11 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
1273
1287
r1Stream .ctx ,
1274
1288
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
1275
1289
hlc.Timestamp {WallTime : 1 },
1276
- nil , /* catchUpIter */
1277
- false , /* withDiff */
1278
- false , /* withFiltering */
1279
- false /* withOmitRemote */ , false , /* withBulkDelivery */
1290
+ nil , /* catchUpIter */
1291
+ false , /* withDiff */
1292
+ false , /* withFiltering */
1293
+ false , /* withOmitRemote */
1294
+ noBulkDelivery ,
1280
1295
h .toBufferedStreamIfNeeded (r1Stream ),
1281
1296
)
1282
1297
@@ -1286,10 +1301,10 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
1286
1301
r2Stream .ctx ,
1287
1302
roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("m" )},
1288
1303
hlc.Timestamp {WallTime : 1 },
1289
- nil , /* catchUpIter */
1290
- false , /* withDiff */
1291
- false , /* withFiltering */
1292
- false /* withOmitRemote */ , false , /* withBulkDelivery */
1304
+ nil , /* catchUpIter */
1305
+ false , /* withDiff */
1306
+ false , /* withFiltering */
1307
+ false /* withOmitRemote */ , noBulkDelivery ,
1293
1308
h .toBufferedStreamIfNeeded (r2Stream ),
1294
1309
)
1295
1310
h .syncEventAndRegistrations ()
@@ -1458,7 +1473,8 @@ func TestProcessorBackpressure(t *testing.T) {
1458
1473
// Add a registration.
1459
1474
stream := newTestStream ()
1460
1475
ok , _ , _ := p .Register (stream .ctx , span , hlc .MinTimestamp , nil , /* catchUpIter */
1461
- false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , false , /* withBulkDelivery */
1476
+ false /* withDiff */ , false /* withFiltering */ , false , /* withOmitRemote */
1477
+ noBulkDelivery ,
1462
1478
h .toBufferedStreamIfNeeded (stream ))
1463
1479
require .True (t , ok )
1464
1480
0 commit comments