@@ -124,10 +124,18 @@ func MakeBulkAdder(
124
124
settings : settings ,
125
125
skipDuplicates : opts .SkipDuplicates ,
126
126
disallowShadowingBelow : opts .DisallowShadowingBelow ,
127
- batchTS : opts .BatchTimestamp ,
128
- writeAtBatchTS : opts .WriteAtBatchTimestamp ,
129
- mem : bulkMon .MakeConcurrentBoundAccount (),
130
- limiter : sendLimiter ,
127
+ batch : batch {
128
+ // TODO(jeffswenson): As far as I can tell, setting the batch timestamp here does nothing. The API
129
+ // is actively misleading.
130
+ //
131
+ // It used to do something before #75275 was merged, but now it is always test to hlc.Timestamp{}
132
+ // by reset before the first flush. I think we should clean this up, but cleaning it up properly
133
+ // requires cleanups throughout the schema changer code base to replace "WriteAsOf" with "ReadAsOf".
134
+ ts : opts .BatchTimestamp ,
135
+ },
136
+ writeAtBatchTS : opts .WriteAtBatchTimestamp ,
137
+ mem : bulkMon .MakeConcurrentBoundAccount (),
138
+ limiter : sendLimiter ,
131
139
},
132
140
timestamp : timestamp ,
133
141
maxBufferLimit : opts .MaxBufferSize ,
@@ -201,7 +209,7 @@ func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte)
201
209
return b .curBuf .append (key , value )
202
210
}
203
211
204
- b .sink .currentStats .FlushesDueToSize ++
212
+ b .sink .batch . stats .FlushesDueToSize ++
205
213
log .VEventf (ctx , 3 , "%s adder triggering flush of %s of KVs in %s buffer" ,
206
214
b .name , b .curBuf .KVSize (), b .bufferedMemSize ())
207
215
@@ -264,7 +272,7 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
264
272
}
265
273
266
274
func (b * BufferingAdder ) doFlush (ctx context.Context , forSize bool ) error {
267
- b .sink .currentStats .FillWait += timeutil .Since (b .lastFlush )
275
+ b .sink .batch . stats .FillWait += timeutil .Since (b .lastFlush )
268
276
269
277
if b .bufferedKeys () == 0 {
270
278
if b .onFlush != nil {
@@ -275,7 +283,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
275
283
return nil
276
284
}
277
285
b .sink .Reset (ctx )
278
- b .sink .currentStats .BufferFlushes ++
286
+ b .sink .batch . stats .BufferFlushes ++
279
287
280
288
var before * bulkpb.IngestionPerformanceStats
281
289
var beforeSize int64
@@ -284,7 +292,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
284
292
b .sink .mu .Lock ()
285
293
before = b .sink .mu .totalStats .Identity ().(* bulkpb.IngestionPerformanceStats )
286
294
before .Combine (& b .sink .mu .totalStats )
287
- before .Combine (& b .sink .currentStats )
295
+ before .Combine (& b .sink .batch . stats )
288
296
beforeSize = b .sink .mu .totalBulkOpSummary .DataSize
289
297
b .sink .mu .Unlock ()
290
298
}
@@ -297,7 +305,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
297
305
mvccKey := storage.MVCCKey {Timestamp : b .timestamp }
298
306
299
307
beforeFlush := timeutil .Now ()
300
- b .sink .currentStats .SortWait += beforeFlush .Sub (beforeSort )
308
+ b .sink .batch . stats .SortWait += beforeFlush .Sub (beforeSort )
301
309
302
310
// If this is the first flush and is due to size, if it was unsorted then
303
311
// create initial splits if requested before flushing.
@@ -340,14 +348,14 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
340
348
}
341
349
}
342
350
343
- b .sink .currentStats .FlushWait += timeutil .Since (beforeFlush )
351
+ b .sink .batch . stats .FlushWait += timeutil .Since (beforeFlush )
344
352
345
353
if log .V (3 ) && before != nil {
346
354
b .sink .mu .Lock ()
347
355
written := b .sink .mu .totalBulkOpSummary .DataSize - beforeSize
348
356
afterStats := b .sink .mu .totalStats .Identity ().(* bulkpb.IngestionPerformanceStats )
349
357
afterStats .Combine (& b .sink .mu .totalStats )
350
- afterStats .Combine (& b .sink .currentStats )
358
+ afterStats .Combine (& b .sink .batch . stats )
351
359
b .sink .mu .Unlock ()
352
360
353
361
files := afterStats .Batches - before .Batches
@@ -457,24 +465,24 @@ func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
457
465
splitsWait := beforeScatters .Sub (beforeSplits )
458
466
log .Infof (ctx , "%s adder created %d initial splits in %v from %d keys in %s buffer" ,
459
467
b .name , len (toScatter ), timing (splitsWait ), b .curBuf .Len (), b .curBuf .MemSize ())
460
- b .sink .currentStats .Splits += int64 (len (toScatter ))
461
- b .sink .currentStats .SplitWait += splitsWait
468
+ b .sink .batch . stats .Splits += int64 (len (toScatter ))
469
+ b .sink .batch . stats .SplitWait += splitsWait
462
470
463
471
for _ , splitKey := range toScatter {
464
472
resp , err := b .sink .db .AdminScatter (ctx , splitKey , 0 /* maxSize */ )
465
473
if err != nil {
466
474
log .Warningf (ctx , "failed to scatter: %v" , err )
467
475
continue
468
476
}
469
- b .sink .currentStats .Scatters ++
470
- b .sink .currentStats .ScatterMoved += resp .ReplicasScatteredBytes
477
+ b .sink .batch . stats .Scatters ++
478
+ b .sink .batch . stats .ScatterMoved += resp .ReplicasScatteredBytes
471
479
if resp .ReplicasScatteredBytes > 0 {
472
480
log .VEventf (ctx , 1 , "pre-split scattered %s in non-empty range %s" ,
473
481
sz (resp .ReplicasScatteredBytes ), resp .RangeInfos [0 ].Desc .KeySpan ().AsRawSpanWithNoLocals ())
474
482
}
475
483
}
476
484
scattersWait := timeutil .Since (beforeScatters )
477
- b .sink .currentStats .ScatterWait += scattersWait
485
+ b .sink .batch . stats .ScatterWait += scattersWait
478
486
log .Infof (ctx , "%s adder scattered %d initial split spans in %v" ,
479
487
b .name , len (toScatter ), timing (scattersWait ))
480
488
0 commit comments