|
| 1 | +// Copyright 2025 The Cockroach Authors. |
| 2 | +// |
| 3 | +// Use of this software is governed by the CockroachDB Software License |
| 4 | +// included in the /LICENSE file. |
| 5 | + |
| 6 | +package replicationutils |
| 7 | + |
| 8 | +import ( |
| 9 | + "fmt" |
| 10 | + |
| 11 | + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" |
| 12 | + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" |
| 13 | + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" |
| 14 | + "github.com/cockroachdb/cockroach/pkg/util/syncutil" |
| 15 | + "github.com/cockroachdb/errors" |
| 16 | + pbtypes "github.com/gogo/protobuf/types" |
| 17 | +) |
| 18 | + |
| 19 | +// StreamRangeStatsToProgressMeta converts a range statistics from a rangefeed |
| 20 | +// StreamEvent and converts it to a ProducerMetadata that can be passed through |
| 21 | +// the DistSQL pipeline. |
| 22 | +func StreamRangeStatsToProgressMeta( |
| 23 | + flowCtx *execinfra.FlowCtx, procID int32, stats *streampb.StreamEvent_RangeStats, |
| 24 | +) (*execinfrapb.ProducerMetadata, error) { |
| 25 | + asAny, err := pbtypes.MarshalAny(stats) |
| 26 | + if err != nil { |
| 27 | + return nil, errors.Wrap(err, "unable to convert range stats to any proto") |
| 28 | + } |
| 29 | + return &execinfrapb.ProducerMetadata{ |
| 30 | + BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{ |
| 31 | + NodeID: flowCtx.NodeID.SQLInstanceID(), |
| 32 | + FlowID: flowCtx.ID, |
| 33 | + ProcessorID: procID, |
| 34 | + ProgressDetails: *asAny, |
| 35 | + }, |
| 36 | + }, nil |
| 37 | +} |
| 38 | + |
| 39 | +// AggregateRangeStatsCollector collects rangefeed StreamEvent range stats from |
| 40 | +// multiple processors and aggregates them into single metrics. |
| 41 | +type AggregateRangeStatsCollector struct { |
| 42 | + mu syncutil.Mutex |
| 43 | + stats map[int32]*streampb.StreamEvent_RangeStats |
| 44 | + processorCount int |
| 45 | +} |
| 46 | + |
| 47 | +func NewAggregateRangeStatsCollector(processorCount int) AggregateRangeStatsCollector { |
| 48 | + return AggregateRangeStatsCollector{ |
| 49 | + stats: make(map[int32]*streampb.StreamEvent_RangeStats), |
| 50 | + processorCount: processorCount, |
| 51 | + } |
| 52 | +} |
| 53 | + |
| 54 | +// Add adds range states from a processor to the collector. |
| 55 | +func (r *AggregateRangeStatsCollector) Add( |
| 56 | + processorID int32, stats *streampb.StreamEvent_RangeStats, |
| 57 | +) { |
| 58 | + r.mu.Lock() |
| 59 | + defer r.mu.Unlock() |
| 60 | + r.stats[processorID] = stats |
| 61 | +} |
| 62 | + |
| 63 | +// RollupStats aggregates the collected stats and returns the total range stats, |
| 64 | +// the fraction of ranges that have reached the steady state, and a human |
| 65 | +// readable status message. |
| 66 | +func (r *AggregateRangeStatsCollector) RollupStats() ( |
| 67 | + streampb.StreamEvent_RangeStats, |
| 68 | + float32, |
| 69 | + string, |
| 70 | +) { |
| 71 | + r.mu.Lock() |
| 72 | + defer r.mu.Unlock() |
| 73 | + var total streampb.StreamEvent_RangeStats |
| 74 | + for _, producerStats := range r.stats { |
| 75 | + total.RangeCount += producerStats.RangeCount |
| 76 | + total.ScanningRangeCount += producerStats.ScanningRangeCount |
| 77 | + total.LaggingRangeCount += producerStats.LaggingRangeCount |
| 78 | + } |
| 79 | + initialScanComplete := total.ScanningRangeCount == 0 |
| 80 | + incompleteCount := total.ScanningRangeCount |
| 81 | + if initialScanComplete { |
| 82 | + incompleteCount = total.LaggingRangeCount |
| 83 | + } |
| 84 | + |
| 85 | + fractionCompleted := max( |
| 86 | + // Use a tiny fraction completed to start with a nearly empty |
| 87 | + // progress bar until we get the first batch of range stats. |
| 88 | + float32(0.0001), |
| 89 | + (float32(total.RangeCount-incompleteCount) / float32(total.RangeCount))) |
| 90 | + |
| 91 | + if len(r.stats) != r.processorCount || total.RangeCount == 0 { |
| 92 | + return streampb.StreamEvent_RangeStats{}, 0, fmt.Sprintf("starting streams (%d out of %d)", len(r.stats), r.processorCount) |
| 93 | + } |
| 94 | + if !initialScanComplete { |
| 95 | + return total, fractionCompleted, fmt.Sprintf("initial scan on %d out of %d ranges", total.ScanningRangeCount, total.RangeCount) |
| 96 | + } |
| 97 | + if total.LaggingRangeCount != 0 { |
| 98 | + return total, fractionCompleted, fmt.Sprintf("catching up on %d out of %d ranges", total.LaggingRangeCount, total.RangeCount) |
| 99 | + } |
| 100 | + return total, 1, "" |
| 101 | +} |
0 commit comments