Skip to content

Commit 758088d

Browse files
committed
crosscluster: move aggregate range stats collector into utils
This patch moves the aggregate range stats collector for LDR into the replication utils package so that it can be used in PCR. Epic: None Release note: None
1 parent 2b2511f commit 758088d

File tree

7 files changed

+127
-96
lines changed

7 files changed

+127
-96
lines changed

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_library(
1515
"metrics.go",
1616
"offline_initial_scan_processor.go",
1717
"purgatory.go",
18-
"range_stats.go",
1918
"replication_statements.go",
2019
"savepoint.go",
2120
"sql_crud_writer.go",
@@ -128,7 +127,6 @@ go_test(
128127
"lww_row_processor_test.go",
129128
"main_test.go",
130129
"purgatory_test.go",
131-
"range_stats_test.go",
132130
"replication_statements_test.go",
133131
"savepoint_test.go",
134132
"sql_row_reader_test.go",

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,10 @@ func (r *logicalReplicationResumer) ingest(
289289
settings: &execCfg.Settings.SV,
290290
job: r.job,
291291
frontierUpdates: heartbeatSender.FrontierUpdates,
292-
rangeStats: newRangeStatsCollector(planInfo.writeProcessorCount),
293-
r: r,
292+
rangeStats: replicationutils.NewAggregateRangeStatsCollector(
293+
planInfo.writeProcessorCount,
294+
),
295+
r: r,
294296
}
295297
rowResultWriter := sql.NewCallbackResultWriter(rh.handleRow)
296298
distSQLReceiver := sql.MakeDistSQLReceiver(
@@ -785,7 +787,7 @@ type rowHandler struct {
785787
job *jobs.Job
786788
frontierUpdates chan hlc.Timestamp
787789

788-
rangeStats rangeStatsByProcessorID
790+
rangeStats replicationutils.AggregateRangeStatsCollector
789791

790792
lastPartitionUpdate time.Time
791793

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
1919
"github.com/cockroachdb/cockroach/pkg/clusterversion"
2020
"github.com/cockroachdb/cockroach/pkg/crosscluster"
21+
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
2122
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
2223
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2324
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -50,7 +51,6 @@ import (
5051
"github.com/cockroachdb/cockroach/pkg/util/tracing"
5152
"github.com/cockroachdb/errors"
5253
"github.com/cockroachdb/logtags"
53-
pbtypes "github.com/gogo/protobuf/types"
5454
)
5555

5656
var logicalReplicationWriterResultType = []*types.T{
@@ -395,7 +395,9 @@ func (lrw *logicalReplicationWriterProcessor) Next() (
395395
lrw.FlowCtx.NodeID.SQLInstanceID(), lrw.FlowCtx.ID, lrw.agg)
396396

397397
case stats := <-lrw.rangeStatsCh:
398-
meta, err := lrw.newRangeStatsProgressMeta(stats)
398+
meta, err := replicationutils.StreamRangeStatsToProgressMeta(
399+
lrw.FlowCtx, lrw.ProcessorID, stats,
400+
)
399401
if err != nil {
400402
lrw.MoveToDrainingAndLogError(err)
401403
return nil, lrw.DrainHelper()
@@ -579,23 +581,6 @@ func (lrw *logicalReplicationWriterProcessor) rangeStats(
579581
}
580582
}
581583

582-
func (lrw *logicalReplicationWriterProcessor) newRangeStatsProgressMeta(
583-
stats *streampb.StreamEvent_RangeStats,
584-
) (*execinfrapb.ProducerMetadata, error) {
585-
asAny, err := pbtypes.MarshalAny(stats)
586-
if err != nil {
587-
return nil, errors.Wrap(err, "unable to convert stats into any proto")
588-
}
589-
return &execinfrapb.ProducerMetadata{
590-
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
591-
NodeID: lrw.FlowCtx.NodeID.SQLInstanceID(),
592-
FlowID: lrw.FlowCtx.ID,
593-
ProcessorID: lrw.ProcessorID,
594-
ProgressDetails: *asAny,
595-
},
596-
}, nil
597-
}
598-
599584
func (lrw *logicalReplicationWriterProcessor) checkpoint(
600585
ctx context.Context, resolvedSpans []jobspb.ResolvedSpan,
601586
) error {

pkg/crosscluster/logical/range_stats.go

Lines changed: 0 additions & 65 deletions
This file was deleted.

pkg/crosscluster/replicationutils/BUILD.bazel

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "replicationutils",
5-
srcs = ["utils.go"],
5+
srcs = [
6+
"stats.go",
7+
"utils.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils",
710
visibility = ["//visibility:public"],
811
deps = [
@@ -18,6 +21,8 @@ go_library(
1821
"//pkg/sql/catalog/descs",
1922
"//pkg/sql/catalog/resolver",
2023
"//pkg/sql/catalog/tabledesc",
24+
"//pkg/sql/execinfra",
25+
"//pkg/sql/execinfrapb",
2126
"//pkg/sql/isql",
2227
"//pkg/sql/parser",
2328
"//pkg/sql/privilege",
@@ -28,18 +33,24 @@ go_library(
2833
"//pkg/testutils/fingerprintutils",
2934
"//pkg/util/ctxgroup",
3035
"//pkg/util/hlc",
36+
"//pkg/util/syncutil",
3137
"//pkg/util/timeutil",
3238
"@com_github_cockroachdb_errors//:errors",
39+
"@com_github_gogo_protobuf//types",
3340
],
3441
)
3542

3643
go_test(
3744
name = "replicationutils_test",
38-
srcs = ["utils_test.go"],
45+
srcs = [
46+
"stats_test.go",
47+
"utils_test.go",
48+
],
3949
embed = [":replicationutils"],
4050
deps = [
4151
"//pkg/clusterversion",
4252
"//pkg/kv/kvpb",
53+
"//pkg/repstream/streampb",
4354
"//pkg/roachpb",
4455
"//pkg/settings/cluster",
4556
"//pkg/storage",
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package replicationutils
7+
8+
import (
9+
"fmt"
10+
11+
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
12+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
13+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
14+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
15+
"github.com/cockroachdb/errors"
16+
pbtypes "github.com/gogo/protobuf/types"
17+
)
18+
19+
// StreamRangeStatsToProgressMeta converts a range statistics from a rangefeed
20+
// StreamEvent and converts it to a ProducerMetadata that can be passed through
21+
// the DistSQL pipeline.
22+
func StreamRangeStatsToProgressMeta(
23+
flowCtx *execinfra.FlowCtx, procID int32, stats *streampb.StreamEvent_RangeStats,
24+
) (*execinfrapb.ProducerMetadata, error) {
25+
asAny, err := pbtypes.MarshalAny(stats)
26+
if err != nil {
27+
return nil, errors.Wrap(err, "unable to convert range stats to any proto")
28+
}
29+
return &execinfrapb.ProducerMetadata{
30+
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
31+
NodeID: flowCtx.NodeID.SQLInstanceID(),
32+
FlowID: flowCtx.ID,
33+
ProcessorID: procID,
34+
ProgressDetails: *asAny,
35+
},
36+
}, nil
37+
}
38+
39+
// AggregateRangeStatsCollector collects rangefeed StreamEvent range stats from
40+
// multiple processors and aggregates them into single metrics.
41+
type AggregateRangeStatsCollector struct {
42+
mu syncutil.Mutex
43+
stats map[int32]*streampb.StreamEvent_RangeStats
44+
processorCount int
45+
}
46+
47+
func NewAggregateRangeStatsCollector(processorCount int) AggregateRangeStatsCollector {
48+
return AggregateRangeStatsCollector{
49+
stats: make(map[int32]*streampb.StreamEvent_RangeStats),
50+
processorCount: processorCount,
51+
}
52+
}
53+
54+
// Add adds range states from a processor to the collector.
55+
func (r *AggregateRangeStatsCollector) Add(
56+
processorID int32, stats *streampb.StreamEvent_RangeStats,
57+
) {
58+
r.mu.Lock()
59+
defer r.mu.Unlock()
60+
r.stats[processorID] = stats
61+
}
62+
63+
// RollupStats aggregates the collected stats and returns the total range stats,
64+
// the fraction of ranges that have reached the steady state, and a human
65+
// readable status message.
66+
func (r *AggregateRangeStatsCollector) RollupStats() (
67+
streampb.StreamEvent_RangeStats,
68+
float32,
69+
string,
70+
) {
71+
r.mu.Lock()
72+
defer r.mu.Unlock()
73+
var total streampb.StreamEvent_RangeStats
74+
for _, producerStats := range r.stats {
75+
total.RangeCount += producerStats.RangeCount
76+
total.ScanningRangeCount += producerStats.ScanningRangeCount
77+
total.LaggingRangeCount += producerStats.LaggingRangeCount
78+
}
79+
initialScanComplete := total.ScanningRangeCount == 0
80+
incompleteCount := total.ScanningRangeCount
81+
if initialScanComplete {
82+
incompleteCount = total.LaggingRangeCount
83+
}
84+
85+
fractionCompleted := max(
86+
// Use a tiny fraction completed to start with a nearly empty
87+
// progress bar until we get the first batch of range stats.
88+
float32(0.0001),
89+
(float32(total.RangeCount-incompleteCount) / float32(total.RangeCount)))
90+
91+
if len(r.stats) != r.processorCount || total.RangeCount == 0 {
92+
return streampb.StreamEvent_RangeStats{}, 0, fmt.Sprintf("starting streams (%d out of %d)", len(r.stats), r.processorCount)
93+
}
94+
if !initialScanComplete {
95+
return total, fractionCompleted, fmt.Sprintf("initial scan on %d out of %d ranges", total.ScanningRangeCount, total.RangeCount)
96+
}
97+
if total.LaggingRangeCount != 0 {
98+
return total, fractionCompleted, fmt.Sprintf("catching up on %d out of %d ranges", total.LaggingRangeCount, total.RangeCount)
99+
}
100+
return total, 1, ""
101+
}

pkg/crosscluster/logical/range_stats_test.go renamed to pkg/crosscluster/replicationutils/stats_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
// Copyright 2024 The Cockroach Authors.
1+
// Copyright 2025 The Cockroach Authors.
22
//
33
// Use of this software is governed by the CockroachDB Software License
44
// included in the /LICENSE file.
55

6-
package logical
6+
package replicationutils
77

88
import (
99
"testing"
@@ -72,7 +72,7 @@ func TestRangeStats(t *testing.T) {
7272

7373
for _, tc := range testCases {
7474
t.Run(tc.name, func(t *testing.T) {
75-
r := newRangeStatsCollector(3)
75+
r := NewAggregateRangeStatsCollector(3)
7676
for id, stats := range tc.inputStats {
7777
r.Add(id, stats)
7878
}
@@ -82,15 +82,14 @@ func TestRangeStats(t *testing.T) {
8282
require.Equal(t, tc.fraction, fraction)
8383
require.Equal(t, tc.expectedMsg, msg)
8484

85-
rInitializing := newRangeStatsCollector(4)
85+
rInitializing := NewAggregateRangeStatsCollector(4)
8686
for id, stats := range tc.inputStats {
8787
r.Add(id, stats)
8888
}
8989
total, fraction, msg = rInitializing.RollupStats()
9090
require.Equal(t, total, streampb.StreamEvent_RangeStats{})
9191
require.Equal(t, float32(0), fraction)
9292
require.Contains(t, msg, "starting streams")
93-
9493
})
9594
}
9695
}

0 commit comments

Comments
 (0)