Skip to content

Commit ef85192

Browse files
craig[bot]andyyang890
andcommitted
Merge #152883
152883: cdcprogresspb: move ResolvedTables proto into new package r=aerfrei,asg0451 a=andyyang890 This commit moves the `ResolvedTables` protobuf from `changefeedpb` to a new `cdcprogresspb` package so that it can be added to the processor specs. This was previously not possible because `pkg/sql/execinfrapb` (where the processor specs live) disallows imports of `pkg/sql/parser`, which is a dependency of a different protobuf in `changefeedpb`. The `ProtectedTimestampRecords` protobuf has also been moved for consistency. Informs #148119 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 889f8fc + 8a3d0b0 commit ef85192

File tree

11 files changed

+71
-42
lines changed

11 files changed

+71
-42
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,7 @@ GO_TARGETS = [
931931
"//pkg/ccl/changefeedccl/cdceval:cdceval_test",
932932
"//pkg/ccl/changefeedccl/cdcevent:cdcevent",
933933
"//pkg/ccl/changefeedccl/cdcevent:cdcevent_test",
934+
"//pkg/ccl/changefeedccl/cdcprogresspb:cdcprogresspb",
934935
"//pkg/ccl/changefeedccl/cdctest:cdctest",
935936
"//pkg/ccl/changefeedccl/cdctest:cdctest_test",
936937
"//pkg/ccl/changefeedccl/cdcutils:cdcutils",

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ go_library(
5252
"//pkg/ccl/changefeedccl/avro",
5353
"//pkg/ccl/changefeedccl/cdceval",
5454
"//pkg/ccl/changefeedccl/cdcevent",
55+
"//pkg/ccl/changefeedccl/cdcprogresspb",
5556
"//pkg/ccl/changefeedccl/cdcutils",
5657
"//pkg/ccl/changefeedccl/changefeedbase",
5758
"//pkg/ccl/changefeedccl/changefeedpb",
@@ -238,6 +239,7 @@ go_test(
238239
"//pkg/ccl",
239240
"//pkg/ccl/changefeedccl/cdceval",
240241
"//pkg/ccl/changefeedccl/cdcevent",
242+
"//pkg/ccl/changefeedccl/cdcprogresspb",
241243
"//pkg/ccl/changefeedccl/cdctest",
242244
"//pkg/ccl/changefeedccl/changefeedbase",
243245
"//pkg/ccl/changefeedccl/changefeedpb",
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
load("@rules_proto//proto:defs.bzl", "proto_library")
2+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
3+
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
4+
5+
proto_library(
6+
name = "cdcprogresspb_proto",
7+
srcs = ["progress.proto"],
8+
strip_import_prefix = "/pkg",
9+
visibility = ["//visibility:public"],
10+
deps = [
11+
"//pkg/jobs/jobspb:jobspb_proto",
12+
"//pkg/util/hlc:hlc_proto",
13+
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
14+
],
15+
)
16+
17+
go_proto_library(
18+
name = "cdcprogresspb_go_proto",
19+
compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"],
20+
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb",
21+
proto = ":cdcprogresspb_proto",
22+
visibility = ["//visibility:public"],
23+
deps = [
24+
"//pkg/jobs/jobspb",
25+
"//pkg/util/hlc",
26+
"@com_github_gogo_protobuf//gogoproto",
27+
],
28+
)
29+
30+
go_library(
31+
name = "cdcprogresspb",
32+
srcs = ["progress.go"],
33+
embed = [":cdcprogresspb_go_proto"],
34+
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb",
35+
visibility = ["//visibility:public"],
36+
deps = [
37+
"//pkg/sql/catalog/descpb",
38+
"//pkg/util/uuid",
39+
],
40+
)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package cdcprogresspb
7+
8+
import (
9+
_ "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" // Needed for progress.proto.
10+
_ "github.com/cockroachdb/cockroach/pkg/util/uuid" // Needed for progress.proto.
11+
)

pkg/ccl/changefeedccl/changefeedpb/changefeed.proto renamed to pkg/ccl/changefeedccl/cdcprogresspb/progress.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
// included in the /LICENSE file.
55

66
syntax = "proto3";
7-
package cockroach.ccl.changefeedccl;
8-
option go_package = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb";
7+
package cockroach.ccl.changefeedccl.cdcprogresspb;
8+
option go_package = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb";
99

1010
import "gogoproto/gogo.proto";
1111
import "jobs/jobspb/jobs.proto";

pkg/ccl/changefeedccl/changefeed_job_info_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99
"context"
1010
"testing"
1111

12+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1213
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
1314
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
14-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
1515
"github.com/cockroachdb/cockroach/pkg/jobs"
1616
"github.com/cockroachdb/cockroach/pkg/sql"
1717
"github.com/cockroachdb/cockroach/pkg/sql/isql"
@@ -50,7 +50,7 @@ func TestChangefeedJobInfoResolvedTables(t *testing.T) {
5050
waitForHighwater(t, enterpriseFeed, s.Server.JobRegistry().(*jobs.Registry))
5151

5252
// Make sure the ResolvedTables message was persisted and can be decoded.
53-
var resolvedTables changefeedpb.ResolvedTables
53+
var resolvedTables cdcprogresspb.ResolvedTables
5454
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
5555
err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
5656
return readChangefeedJobInfo(ctx, resolvedTablesFilename, &resolvedTables, txn, enterpriseFeed.JobID())

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
"sync"
1515
"time"
1616

17+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
1819
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
19-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
2020
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
2121
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
2222
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
@@ -1868,7 +1868,7 @@ func (cf *changeFrontier) checkpointJobProgress(
18681868

18691869
// Write per-table progress if enabled.
18701870
if cf.spec.ProgressConfig != nil && cf.spec.ProgressConfig.PerTableTracking {
1871-
resolvedTables := &changefeedpb.ResolvedTables{
1871+
resolvedTables := &cdcprogresspb.ResolvedTables{
18721872
Tables: make(map[descpb.ID]hlc.Timestamp),
18731873
}
18741874
for tableID, tableFrontier := range cf.frontier.Frontiers() {
@@ -1930,7 +1930,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19301930
}
19311931
}()
19321932

1933-
var ptsEntries changefeedpb.ProtectedTimestampRecords
1933+
var ptsEntries cdcprogresspb.ProtectedTimestampRecords
19341934
if err := readChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, cf.spec.JobID); err != nil {
19351935
return false, err
19361936
}
@@ -1961,7 +1961,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19611961
func (cf *changeFrontier) managePerTableProtectedTimestamps(
19621962
ctx context.Context,
19631963
txn isql.Txn,
1964-
ptsEntries *changefeedpb.ProtectedTimestampRecords,
1964+
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
19651965
highwater hlc.Timestamp,
19661966
) (newPTS hlc.Timestamp, updatedPerTablePTS bool, err error) {
19671967
var leastLaggingTimestamp hlc.Timestamp
@@ -2036,7 +2036,7 @@ func (cf *changeFrontier) managePerTableProtectedTimestamps(
20362036
func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
20372037
ctx context.Context,
20382038
txn isql.Txn,
2039-
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2039+
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20402040
tableIDs []descpb.ID,
20412041
pts protectedts.Storage,
20422042
) error {
@@ -2051,7 +2051,7 @@ func (cf *changeFrontier) releasePerTableProtectedTimestampRecords(
20512051

20522052
func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20532053
ctx context.Context,
2054-
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2054+
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20552055
tableID descpb.ID,
20562056
tableHighWater hlc.Timestamp,
20572057
pts protectedts.Storage,
@@ -2075,7 +2075,7 @@ func (cf *changeFrontier) advancePerTableProtectedTimestampRecord(
20752075
func (cf *changeFrontier) createPerTableProtectedTimestampRecord(
20762076
ctx context.Context,
20772077
txn isql.Txn,
2078-
ptsEntries *changefeedpb.ProtectedTimestampRecords,
2078+
ptsEntries *cdcprogresspb.ProtectedTimestampRecords,
20792079
tableID descpb.ID,
20802080
tableHighWater hlc.Timestamp,
20812081
pts protectedts.Storage,

pkg/ccl/changefeedccl/changefeedpb/BUILD.bazel

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,9 @@ load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
44

55
proto_library(
66
name = "changefeedpb_proto",
7-
srcs = [
8-
"changefeed.proto",
9-
"scheduled_changefeed.proto",
10-
],
7+
srcs = ["scheduled_changefeed.proto"],
118
strip_import_prefix = "/pkg",
129
visibility = ["//visibility:public"],
13-
deps = [
14-
"//pkg/jobs/jobspb:jobspb_proto",
15-
"//pkg/util/hlc:hlc_proto",
16-
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
17-
],
1810
)
1911

2012
go_proto_library(
@@ -23,26 +15,16 @@ go_proto_library(
2315
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb",
2416
proto = ":changefeedpb_proto",
2517
visibility = ["//visibility:public"],
26-
deps = [
27-
"//pkg/jobs/jobspb",
28-
"//pkg/util/hlc",
29-
"//pkg/util/uuid", # keep
30-
"@com_github_gogo_protobuf//gogoproto",
31-
],
3218
)
3319

3420
go_library(
3521
name = "changefeedpb",
36-
srcs = [
37-
"changefeed.go",
38-
"marshal.go",
39-
],
22+
srcs = ["marshal.go"],
4023
embed = [":changefeedpb_go_proto"],
4124
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb",
4225
visibility = ["//visibility:public"],
4326
deps = [
4427
"//pkg/cloud",
45-
"//pkg/sql/catalog/descpb",
4628
"//pkg/sql/parser",
4729
"//pkg/sql/sem/tree",
4830
"@com_github_cockroachdb_errors//:errors",

pkg/ccl/changefeedccl/changefeedpb/changefeed.go

Lines changed: 0 additions & 8 deletions
This file was deleted.

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
"time"
1515

1616
"github.com/cockroachdb/cockroach/pkg/base"
17+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
1819
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
19-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb"
2020
"github.com/cockroachdb/cockroach/pkg/jobs"
2121
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2222
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -1033,7 +1033,7 @@ func TestChangefeedProtectedTimestampUpdateForMultipleTables(t *testing.T) {
10331033

10341034
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
10351035
err = execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error {
1036-
var ptsEntries changefeedpb.ProtectedTimestampRecords
1036+
var ptsEntries cdcprogresspb.ProtectedTimestampRecords
10371037
if err := readChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, eFeed.JobID()); err != nil {
10381038
return err
10391039
}
@@ -1131,7 +1131,7 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
11311131
assertTablePTSRecords := func(expectedTables map[descpb.ID]struct{}) {
11321132
testutils.SucceedsSoon(t, func() error {
11331133
return execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error {
1134-
var ptsEntries changefeedpb.ProtectedTimestampRecords
1134+
var ptsEntries cdcprogresspb.ProtectedTimestampRecords
11351135
if err := readChangefeedJobInfo(ctx, perTableProtectedTimestampsFilename, &ptsEntries, txn, eFeed.JobID()); err != nil {
11361136
return err
11371137
}

0 commit comments

Comments
 (0)