Skip to content

Commit b1f6b1c

Browse files
committed
changefeedccl: delete per-table resolved timestamps code
Now that we periodically persist the entire span frontier, there is no longer a need for per-table resolved timestamps and thus all the relevant code can be deleted. Release note: None
1 parent 378e7c3 commit b1f6b1c

File tree

5 files changed

+39
-79
lines changed

5 files changed

+39
-79
lines changed

pkg/ccl/changefeedccl/cdcprogresspb/BUILD.bazel

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@ proto_library(
77
srcs = ["progress.proto"],
88
strip_import_prefix = "/pkg",
99
visibility = ["//visibility:public"],
10-
deps = [
11-
"//pkg/jobs/jobspb:jobspb_proto",
12-
"//pkg/util/hlc:hlc_proto",
13-
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
14-
],
10+
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
1511
)
1612

1713
go_proto_library(
@@ -20,11 +16,7 @@ go_proto_library(
2016
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb",
2117
proto = ":cdcprogresspb_proto",
2218
visibility = ["//visibility:public"],
23-
deps = [
24-
"//pkg/jobs/jobspb",
25-
"//pkg/util/hlc",
26-
"@com_github_gogo_protobuf//gogoproto",
27-
],
19+
deps = ["@com_github_gogo_protobuf//gogoproto"],
2820
)
2921

3022
go_library(

pkg/ccl/changefeedccl/cdcprogresspb/progress.proto

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,6 @@ package cockroach.ccl.changefeedccl.cdcprogresspb;
88
option go_package = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb";
99

1010
import "gogoproto/gogo.proto";
11-
import "jobs/jobspb/jobs.proto";
12-
import "util/hlc/timestamp.proto";
13-
14-
// ResolvedTables contains per-table resolved timestamp information.
15-
message ResolvedTables {
16-
map<uint32, util.hlc.Timestamp> tables = 1 [
17-
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID",
18-
(gogoproto.nullable) = false
19-
];
20-
21-
// TODO(#148124): Write to this field for aggregator-to-frontier messages.
22-
cockroach.sql.jobs.jobspb.ResolvedSpan.BoundaryType boundary_type = 2;
23-
}
2411

2512
// ProtectedTimestampRecords is a map from table descriptor IDs to protected timestamp record IDs.
2613
message ProtectedTimestampRecords {

pkg/ccl/changefeedccl/changefeed_job_info.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ const (
2323
)
2424

2525
const (
26-
// resolvedTablesFilename: ~changefeed/resolved-tables.binpb
27-
resolvedTablesFilename = jobInfoFilenamePrefix + "resolved-tables" + jobInfoFilenameExtension
2826
// perTableProtectedTimestampsFilename: ~changefeed/per-table-protected-timestamps.binpb
2927
perTableProtectedTimestampsFilename = jobInfoFilenamePrefix + "per-table-protected-timestamps" + jobInfoFilenameExtension
3028
)
@@ -49,7 +47,6 @@ func writeChangefeedJobInfo(
4947

5048
// readChangefeedJobInfo reads a changefeed job info protobuf from the
5149
// job_info table. A changefeed-specific filename is required.
52-
// TODO(#148119): Use this function to read.
5350
func readChangefeedJobInfo(
5451
ctx context.Context, filename string, info protoutil.Message, txn isql.Txn, jobID jobspb.JobID,
5552
) error {

pkg/ccl/changefeedccl/changefeed_job_info_test.go

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,55 +9,53 @@ import (
99
"context"
1010
"testing"
1111

12+
"github.com/cockroachdb/cockroach/pkg/base"
1213
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
13-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
15-
"github.com/cockroachdb/cockroach/pkg/jobs"
16-
"github.com/cockroachdb/cockroach/pkg/sql"
14+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
15+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1716
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18-
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1918
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2019
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2121
"github.com/stretchr/testify/require"
2222
)
2323

24-
func TestChangefeedJobInfoResolvedTables(t *testing.T) {
24+
func TestChangefeedJobInfoRoundTrip(t *testing.T) {
2525
defer leaktest.AfterTest(t)()
2626
defer log.Scope(t).Close(t)
2727

28-
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
29-
ctx := context.Background()
30-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
31-
32-
// Make sure per-table tracking is enabled.
33-
changefeedbase.TrackPerTableProgress.Override(ctx, &s.Server.ClusterSettings().SV, true)
34-
35-
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
36-
sqlDB.Exec(t, `CREATE TABLE bar (x INT PRIMARY KEY, y STRING)`)
37-
feed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
38-
defer closeFeed(t, feed)
39-
40-
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'one')`)
41-
sqlDB.Exec(t, `INSERT INTO bar VALUES (10, 'ten')`)
42-
assertPayloads(t, feed, []string{
43-
`foo: [1]->{"after": {"a": 1, "b": "one"}}`,
44-
`bar: [10]->{"after": {"x": 10, "y": "ten"}}`,
45-
})
46-
47-
// The ResolvedTables message should be persisted to the job_info table
48-
// at the same time as the highwater being set.
49-
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
50-
waitForHighwater(t, enterpriseFeed, s.Server.JobRegistry().(*jobs.Registry))
51-
52-
// Make sure the ResolvedTables message was persisted and can be decoded.
53-
var resolvedTables cdcprogresspb.ResolvedTables
54-
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
55-
err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
56-
return readChangefeedJobInfo(ctx, resolvedTablesFilename, &resolvedTables, txn, enterpriseFeed.JobID())
57-
})
58-
require.NoError(t, err)
59-
require.Len(t, resolvedTables.Tables, 2)
28+
ctx := context.Background()
29+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
30+
defer srv.Stopper().Stop(ctx)
31+
32+
jobID := jobspb.JobID(123456)
33+
34+
// Create a basic progress record.
35+
uuid1 := uuid.MakeV4()
36+
uuid2 := uuid.MakeV4()
37+
ptsRecords := cdcprogresspb.ProtectedTimestampRecords{
38+
ProtectedTimestampRecords: map[descpb.ID]*uuid.UUID{
39+
descpb.ID(100): &uuid1,
40+
descpb.ID(200): &uuid2,
41+
},
6042
}
6143

62-
cdcTest(t, testFn, feedTestEnterpriseSinks)
44+
// Write the progress record.
45+
err := srv.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
46+
return writeChangefeedJobInfo(ctx,
47+
perTableProtectedTimestampsFilename, &ptsRecords, txn, jobID)
48+
})
49+
require.NoError(t, err)
50+
51+
// Read the record back.
52+
var readPTSRecords cdcprogresspb.ProtectedTimestampRecords
53+
err = srv.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
54+
return readChangefeedJobInfo(ctx,
55+
perTableProtectedTimestampsFilename, &readPTSRecords, txn, jobID)
56+
})
57+
require.NoError(t, err)
58+
59+
// Verify the read data matches the written data.
60+
require.Equal(t, ptsRecords, readPTSRecords)
6361
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,20 +1877,6 @@ func (cf *changeFrontier) checkpointJobProgress(
18771877
progress.StatusMessage = fmt.Sprintf("running: resolved=%s", frontier)
18781878
}
18791879

1880-
// Write per-table progress if enabled.
1881-
if cf.spec.ProgressConfig != nil && cf.spec.ProgressConfig.PerTableTracking {
1882-
resolvedTables := &cdcprogresspb.ResolvedTables{
1883-
Tables: make(map[descpb.ID]hlc.Timestamp),
1884-
}
1885-
for tableID, tableFrontier := range cf.frontier.Frontiers() {
1886-
resolvedTables.Tables[tableID] = tableFrontier.Frontier()
1887-
}
1888-
1889-
if err := writeChangefeedJobInfo(ctx, resolvedTablesFilename, resolvedTables, txn, cf.spec.JobID); err != nil {
1890-
return errors.Wrap(err, "error writing resolved tables to job info")
1891-
}
1892-
}
1893-
18941880
ju.UpdateProgress(progress)
18951881

18961882
return nil

0 commit comments

Comments
 (0)