@@ -31,6 +31,7 @@ import (
31
31
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
32
32
"github.com/cockroachdb/errors"
33
33
"github.com/cockroachdb/redact"
34
+ pbtypes "github.com/gogo/protobuf/types"
34
35
)
35
36
36
37
const (
@@ -83,6 +84,14 @@ type streamIngestionFrontier struct {
83
84
// replicatedTimeAtLastPositiveLagNodeCheck records the replicated time the
84
85
// last time the lagging node checker detected a lagging node.
85
86
replicatedTimeAtLastPositiveLagNodeCheck hlc.Timestamp
87
+
88
+ rangeStats replicationutils.AggregateRangeStatsCollector
89
+
90
+ // This stores the last aggregate stats we computed. Because stats are only
91
+ // updated on a checkpoint event, the stats will be stale until the next
92
+ // checkpoint and should not be used to update job statuses. Only on a fresh
93
+ // checkpoint should we update job statuses.
94
+ lastAggStats streampb.StreamEvent_RangeStats
86
95
}
87
96
88
97
var _ execinfra.Processor = & streamIngestionFrontier {}
@@ -138,6 +147,9 @@ func newStreamIngestionFrontierProcessor(
138
147
return crosscluster .StreamReplicationConsumerHeartbeatFrequency .Get (& flowCtx .Cfg .Settings .SV )
139
148
}),
140
149
persistedReplicatedTime : spec .ReplicatedTimeAtStart ,
150
+ rangeStats : replicationutils .NewAggregateRangeStatsCollector (
151
+ int (spec .NumIngestionProcessors ),
152
+ ),
141
153
}
142
154
if err := sf .Init (
143
155
ctx ,
@@ -184,6 +196,10 @@ func (sf *streamIngestionFrontier) Next() (
184
196
if meta .Err != nil {
185
197
sf .MoveToDrainingAndLogError (nil /* err */ )
186
198
}
199
+ if err := sf .maybeCollectRangeStats (sf .Ctx (), meta ); err != nil {
200
+ sf .MoveToDrainingAndLogError (err )
201
+ break
202
+ }
187
203
return nil , meta
188
204
}
189
205
if row == nil {
@@ -328,6 +344,9 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
328
344
replicatedTime := f .Frontier ()
329
345
sf .lastPartitionUpdate = timeutil .Now ()
330
346
log .Dev .VInfof (ctx , 2 , "persisting replicated time of %s" , replicatedTime )
347
+
348
+ statusByStats := sf .aggregateAndUpdateRangeMetrics ()
349
+
331
350
if err := registry .UpdateJobWithTxn (ctx , jobID , nil /* txn */ , func (
332
351
txn isql.Txn , md jobs.JobMetadata , ju * jobs.JobUpdater ,
333
352
) error {
@@ -342,6 +361,8 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
342
361
if replicatedTime .IsSet () && streamProgress .ReplicationStatus == jobspb .InitialScan {
343
362
streamProgress .ReplicationStatus = jobspb .Replicating
344
363
md .Progress .StatusMessage = streamProgress .ReplicationStatus .String ()
364
+ } else if statusByStats != "" {
365
+ md .Progress .StatusMessage = statusByStats
345
366
}
346
367
347
368
// Keep the recorded replicatedTime empty until some advancement has been made
@@ -408,6 +429,44 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
408
429
return nil
409
430
}
410
431
432
+ func (sf * streamIngestionFrontier ) maybeCollectRangeStats (
433
+ ctx context.Context , meta * execinfrapb.ProducerMetadata ,
434
+ ) error {
435
+ if meta .BulkProcessorProgress == nil {
436
+ log .Dev .VInfof (ctx , 2 , "received non-progress producer meta: %v" , meta )
437
+ return nil
438
+ }
439
+
440
+ var stats streampb.StreamEvent_RangeStats
441
+ if err := pbtypes .UnmarshalAny (& meta .BulkProcessorProgress .ProgressDetails , & stats ); err != nil {
442
+ return errors .Wrap (err , "unable to unmarshal progress details" )
443
+ }
444
+
445
+ sf .rangeStats .Add (meta .BulkProcessorProgress .ProcessorID , & stats )
446
+ return nil
447
+ }
448
+
449
+ // aggregateAndUpdateRangeMetrics aggregates the range stats collected from each
450
+ // of the ingestion processors and updates the corresponding metrics. If the
451
+ // stats have changed since the last aggregation, it returns a status message
452
+ // to update the job status with. We do this to avoid overwriting job statuses
453
+ // with stale stats as the stats will be the same until the next checkpoint
454
+ // event.
455
+ func (sf * streamIngestionFrontier ) aggregateAndUpdateRangeMetrics () string {
456
+ aggRangeStats , _ , statusMsg := sf .rangeStats .RollupStats ()
457
+ if aggRangeStats .RangeCount != 0 {
458
+ sf .metrics .ScanningRanges .Update (aggRangeStats .ScanningRangeCount )
459
+ sf .metrics .CatchupRanges .Update (aggRangeStats .LaggingRangeCount )
460
+ }
461
+ if sf .lastAggStats == aggRangeStats {
462
+ // This is the same stats as last time, so we don't need to update the job
463
+ // status.
464
+ return ""
465
+ }
466
+ sf .lastAggStats = aggRangeStats
467
+ return statusMsg
468
+ }
469
+
411
470
// maybePersistFrontierEntries periodically persists the current state of the
412
471
// frontier to the `system.job_info` table. This information is used to hydrate
413
472
// the execution details that can be requested for the C2C ingestion job. Note,
0 commit comments