Skip to content

Commit d8e21ad

Browse files
craig[bot]andyyang890tbgiskettaneh
committed
153621: changefeedccl: restore persisted span frontier from job_info r=KeithCh a=andyyang890 Changefeeds will now restore their persisted span frontiers on restart. Fixes #153201 Release note: None 153652: mmaprototype: thread ctx r=tbg a=tbg This will make the traces in #153584 more informative. Epic: CRDB-49117 153659: changefeedccl: delete per-table resolved timestamps code r=log-head a=andyyang890 Now that we periodically persist the entire span frontier, there is no longer a need for per-table resolved timestamps and thus all the relevant code can be deleted. Fixes #153492 Release note: None 153680: roachtest: increase the verbosity of runMultiStoreRemove r=iskettaneh a=iskettaneh Recent failures in runMultiStoreRemove seems to indicate a potential race between follower replica removal and leader fortification. References: #153517 Release note: None Co-authored-by: Andy Yang <[email protected]> Co-authored-by: Tobias Grieger <[email protected]> Co-authored-by: Ibrahim Kettaneh <[email protected]>
5 parents 66a3351 + 4d7f0f3 + a2aa31a + b1f6b1c + 9e8e207 commit d8e21ad

File tree

14 files changed

+211
-108
lines changed

14 files changed

+211
-108
lines changed

pkg/ccl/changefeedccl/cdcprogresspb/BUILD.bazel

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@ proto_library(
77
srcs = ["progress.proto"],
88
strip_import_prefix = "/pkg",
99
visibility = ["//visibility:public"],
10-
deps = [
11-
"//pkg/jobs/jobspb:jobspb_proto",
12-
"//pkg/util/hlc:hlc_proto",
13-
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
14-
],
10+
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
1511
)
1612

1713
go_proto_library(
@@ -20,11 +16,7 @@ go_proto_library(
2016
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb",
2117
proto = ":cdcprogresspb_proto",
2218
visibility = ["//visibility:public"],
23-
deps = [
24-
"//pkg/jobs/jobspb",
25-
"//pkg/util/hlc",
26-
"@com_github_gogo_protobuf//gogoproto",
27-
],
19+
deps = ["@com_github_gogo_protobuf//gogoproto"],
2820
)
2921

3022
go_library(

pkg/ccl/changefeedccl/cdcprogresspb/progress.proto

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,6 @@ package cockroach.ccl.changefeedccl.cdcprogresspb;
88
option go_package = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb";
99

1010
import "gogoproto/gogo.proto";
11-
import "jobs/jobspb/jobs.proto";
12-
import "util/hlc/timestamp.proto";
13-
14-
// ResolvedTables contains per-table resolved timestamp information.
15-
message ResolvedTables {
16-
map<uint32, util.hlc.Timestamp> tables = 1 [
17-
(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID",
18-
(gogoproto.nullable) = false
19-
];
20-
21-
// TODO(#148124): Write to this field for aggregator-to-frontier messages.
22-
cockroach.sql.jobs.jobspb.ResolvedSpan.BoundaryType boundary_type = 2;
23-
}
2411

2512
// ProtectedTimestampRecords is a map from table descriptor IDs to protected timestamp record IDs.
2613
message ProtectedTimestampRecords {

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1616
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
1717
"github.com/cockroachdb/cockroach/pkg/clusterversion"
18+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1819
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1920
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
2021
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -470,6 +471,22 @@ func makePlan(
470471
}
471472
}
472473

474+
var resolvedSpans []jobspb.ResolvedSpan
475+
if jobID != 0 {
476+
if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
477+
spans, ok, err := jobfrontier.GetAllResolvedSpans(ctx, txn, jobID)
478+
if err != nil {
479+
return err
480+
}
481+
if ok {
482+
resolvedSpans = spans
483+
}
484+
return nil
485+
}); err != nil {
486+
return nil, nil, err
487+
}
488+
}
489+
473490
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
474491
for i, sp := range spanPartitions {
475492
if log.ExpensiveLogEnabled(ctx, 2) {
@@ -493,6 +510,7 @@ func makePlan(
493510
Select: execinfrapb.Expression{Expr: details.Select},
494511
Description: description,
495512
ProgressConfig: progressConfig,
513+
ResolvedSpans: resolvedSpans,
496514
}
497515
}
498516

@@ -508,6 +526,7 @@ func makePlan(
508526
UserProto: execCtx.User().EncodeProto(),
509527
Description: description,
510528
ProgressConfig: progressConfig,
529+
ResolvedSpans: resolvedSpans,
511530
}
512531

513532
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {

pkg/ccl/changefeedccl/changefeed_job_info.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ const (
2323
)
2424

2525
const (
26-
// resolvedTablesFilename: ~changefeed/resolved-tables.binpb
27-
resolvedTablesFilename = jobInfoFilenamePrefix + "resolved-tables" + jobInfoFilenameExtension
2826
// perTableProtectedTimestampsFilename: ~changefeed/per-table-protected-timestamps.binpb
2927
perTableProtectedTimestampsFilename = jobInfoFilenamePrefix + "per-table-protected-timestamps" + jobInfoFilenameExtension
3028
)
@@ -49,7 +47,6 @@ func writeChangefeedJobInfo(
4947

5048
// readChangefeedJobInfo reads a changefeed job info protobuf from the
5149
// job_info table. A changefeed-specific filename is required.
52-
// TODO(#148119): Use this function to read.
5350
func readChangefeedJobInfo(
5451
ctx context.Context, filename string, info protoutil.Message, txn isql.Txn, jobID jobspb.JobID,
5552
) error {

pkg/ccl/changefeedccl/changefeed_job_info_test.go

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,55 +9,53 @@ import (
99
"context"
1010
"testing"
1111

12+
"github.com/cockroachdb/cockroach/pkg/base"
1213
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcprogresspb"
13-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
15-
"github.com/cockroachdb/cockroach/pkg/jobs"
16-
"github.com/cockroachdb/cockroach/pkg/sql"
14+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
15+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1716
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18-
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1918
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2019
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2121
"github.com/stretchr/testify/require"
2222
)
2323

24-
func TestChangefeedJobInfoResolvedTables(t *testing.T) {
24+
func TestChangefeedJobInfoRoundTrip(t *testing.T) {
2525
defer leaktest.AfterTest(t)()
2626
defer log.Scope(t).Close(t)
2727

28-
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
29-
ctx := context.Background()
30-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
31-
32-
// Make sure per-table tracking is enabled.
33-
changefeedbase.TrackPerTableProgress.Override(ctx, &s.Server.ClusterSettings().SV, true)
34-
35-
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
36-
sqlDB.Exec(t, `CREATE TABLE bar (x INT PRIMARY KEY, y STRING)`)
37-
feed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
38-
defer closeFeed(t, feed)
39-
40-
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'one')`)
41-
sqlDB.Exec(t, `INSERT INTO bar VALUES (10, 'ten')`)
42-
assertPayloads(t, feed, []string{
43-
`foo: [1]->{"after": {"a": 1, "b": "one"}}`,
44-
`bar: [10]->{"after": {"x": 10, "y": "ten"}}`,
45-
})
46-
47-
// The ResolvedTables message should be persisted to the job_info table
48-
// at the same time as the highwater being set.
49-
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
50-
waitForHighwater(t, enterpriseFeed, s.Server.JobRegistry().(*jobs.Registry))
51-
52-
// Make sure the ResolvedTables message was persisted and can be decoded.
53-
var resolvedTables cdcprogresspb.ResolvedTables
54-
execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
55-
err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
56-
return readChangefeedJobInfo(ctx, resolvedTablesFilename, &resolvedTables, txn, enterpriseFeed.JobID())
57-
})
58-
require.NoError(t, err)
59-
require.Len(t, resolvedTables.Tables, 2)
28+
ctx := context.Background()
29+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
30+
defer srv.Stopper().Stop(ctx)
31+
32+
jobID := jobspb.JobID(123456)
33+
34+
// Create a basic progress record.
35+
uuid1 := uuid.MakeV4()
36+
uuid2 := uuid.MakeV4()
37+
ptsRecords := cdcprogresspb.ProtectedTimestampRecords{
38+
ProtectedTimestampRecords: map[descpb.ID]*uuid.UUID{
39+
descpb.ID(100): &uuid1,
40+
descpb.ID(200): &uuid2,
41+
},
6042
}
6143

62-
cdcTest(t, testFn, feedTestEnterpriseSinks)
44+
// Write the progress record.
45+
err := srv.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
46+
return writeChangefeedJobInfo(ctx,
47+
perTableProtectedTimestampsFilename, &ptsRecords, txn, jobID)
48+
})
49+
require.NoError(t, err)
50+
51+
// Read the record back.
52+
var readPTSRecords cdcprogresspb.ProtectedTimestampRecords
53+
err = srv.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
54+
return readChangefeedJobInfo(ctx,
55+
perTableProtectedTimestampsFilename, &readPTSRecords, txn, jobID)
56+
})
57+
require.NoError(t, err)
58+
59+
// Verify the read data matches the written data.
60+
require.Equal(t, ptsRecords, readPTSRecords)
6361
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,12 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
656656
return nil, errors.Wrapf(err, "failed to restore span-level checkpoint")
657657
}
658658

659+
for _, rs := range ca.spec.ResolvedSpans {
660+
if _, err := ca.frontier.Forward(rs.Span, rs.Timestamp); err != nil {
661+
return nil, errors.Wrapf(err, "failed to restore frontier")
662+
}
663+
}
664+
659665
return spans, nil
660666
}
661667

@@ -1480,6 +1486,15 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14801486
return
14811487
}
14821488

1489+
for _, rs := range cf.spec.ResolvedSpans {
1490+
if _, err := cf.frontier.Forward(rs.Span, rs.Timestamp); err != nil {
1491+
log.Changefeed.Warningf(cf.Ctx(),
1492+
"moving to draining due to error restoring frontier: %v", err)
1493+
cf.MoveToDraining(err)
1494+
return
1495+
}
1496+
}
1497+
14831498
if cf.knobs.AfterCoordinatorFrontierRestore != nil {
14841499
cf.knobs.AfterCoordinatorFrontierRestore(cf.frontier)
14851500
}
@@ -1877,20 +1892,6 @@ func (cf *changeFrontier) checkpointJobProgress(
18771892
progress.StatusMessage = fmt.Sprintf("running: resolved=%s", frontier)
18781893
}
18791894

1880-
// Write per-table progress if enabled.
1881-
if cf.spec.ProgressConfig != nil && cf.spec.ProgressConfig.PerTableTracking {
1882-
resolvedTables := &cdcprogresspb.ResolvedTables{
1883-
Tables: make(map[descpb.ID]hlc.Timestamp),
1884-
}
1885-
for tableID, tableFrontier := range cf.frontier.Frontiers() {
1886-
resolvedTables.Tables[tableID] = tableFrontier.Frontier()
1887-
}
1888-
1889-
if err := writeChangefeedJobInfo(ctx, resolvedTablesFilename, resolvedTables, txn, cf.spec.JobID); err != nil {
1890-
return errors.Wrap(err, "error writing resolved tables to job info")
1891-
}
1892-
}
1893-
18941895
ju.UpdateProgress(progress)
18951896

18961897
return nil

pkg/ccl/changefeedccl/changefeed_progress_test.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@ import (
1111
"time"
1212

1313
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1415
"github.com/cockroachdb/cockroach/pkg/jobs"
1516
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1617
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
18+
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
1720
"github.com/cockroachdb/cockroach/pkg/sql/isql"
21+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
1822
"github.com/cockroachdb/cockroach/pkg/testutils"
1923
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
24+
"github.com/cockroachdb/cockroach/pkg/util/encoding"
2025
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2126
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/cockroach/pkg/util/span"
2228
"github.com/cockroachdb/errors"
2329
"github.com/stretchr/testify/require"
2430
)
@@ -54,9 +60,9 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
5460

5561
// Make sure frontier gets persisted to job_info table.
5662
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
63+
var allSpans []jobspb.ResolvedSpan
5764
testutils.SucceedsSoon(t, func() error {
5865
var found bool
59-
var allSpans []jobspb.ResolvedSpan
6066
if err := s.Server.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
6167
var err error
6268
allSpans, found, err = jobfrontier.GetAllResolvedSpans(ctx, txn, jobID)
@@ -74,6 +80,17 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
7480
return nil
7581
})
7682

83+
// Make sure the persisted spans cover the entire table.
84+
fooTableSpan := desctestutils.
85+
TestingGetPublicTableDescriptor(s.Server.DB(), s.Codec, "d", "foo").
86+
PrimaryIndexSpan(s.Codec)
87+
var spanGroup roachpb.SpanGroup
88+
spanGroup.Add(fooTableSpan)
89+
for _, rs := range allSpans {
90+
spanGroup.Sub(rs.Span)
91+
}
92+
require.Zero(t, spanGroup.Len())
93+
7794
// Verify metric count and average latency have sensible values.
7895
testutils.SucceedsSoon(t, func() error {
7996
metricSnapshot := metric.CumulativeSnapshot()
@@ -91,3 +108,74 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
91108

92109
cdcTest(t, testFn, feedTestEnterpriseSinks)
93110
}
111+
112+
// TestChangefeedFrontierRestore verifies that changefeeds will correctly
113+
// restore progress from persisted span frontiers.
114+
func TestChangefeedFrontierRestore(t *testing.T) {
115+
defer leaktest.AfterTest(t)()
116+
defer log.Scope(t).Close(t)
117+
118+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
119+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
120+
ctx := context.Background()
121+
122+
// Disable span-level checkpointing.
123+
changefeedbase.SpanCheckpointInterval.Override(ctx, &s.Server.ClusterSettings().SV, 0)
124+
125+
// Create a table and a changefeed on it.
126+
sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
127+
foo := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no'")
128+
defer closeFeed(t, foo)
129+
jobFeed := foo.(cdctest.EnterpriseTestFeed)
130+
131+
// Pause the changefeed.
132+
require.NoError(t, jobFeed.Pause())
133+
134+
// Insert a few rows into the table and save the insert time.
135+
var tsStr string
136+
sqlDB.QueryRow(t, `INSERT INTO foo VALUES (1), (2), (3), (4), (5), (6)
137+
RETURNING cluster_logical_timestamp()`).Scan(&tsStr)
138+
ts := parseTimeToHLC(t, tsStr)
139+
140+
// Make function to create spans for single rows in the table.
141+
codec := s.Server.Codec()
142+
fooDesc := desctestutils.TestingGetPublicTableDescriptor(s.Server.DB(), codec, "d", "foo")
143+
rowSpan := func(key int64) roachpb.Span {
144+
keyPrefix := func() []byte {
145+
return rowenc.MakeIndexKeyPrefix(codec, fooDesc.GetID(), fooDesc.GetPrimaryIndexID())
146+
}
147+
return roachpb.Span{
148+
Key: encoding.EncodeVarintAscending(keyPrefix(), key),
149+
EndKey: encoding.EncodeVarintAscending(keyPrefix(), key+1),
150+
}
151+
}
152+
153+
// Manually persist a span frontier that manually marks some of the
154+
// inserted rows as resolved already.
155+
hw, err := jobFeed.HighWaterMark()
156+
require.NoError(t, err)
157+
frontier, err := span.MakeFrontierAt(hw, fooDesc.PrimaryIndexSpan(codec))
158+
require.NoError(t, err)
159+
for _, id := range []int64{2, 5} {
160+
_, err := frontier.Forward(rowSpan(id), ts)
161+
require.NoError(t, err)
162+
}
163+
err = s.Server.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
164+
return jobfrontier.Store(ctx, txn, jobFeed.JobID(), "test frontier", frontier)
165+
})
166+
require.NoError(t, err)
167+
168+
// Resume the changefeed.
169+
require.NoError(t, jobFeed.Resume())
170+
171+
// We should receive rows 1, 3, 4, 6 (rows 2 and 5 were marked as resolved).
172+
assertPayloads(t, foo, []string{
173+
`foo: [1]->{"after": {"a": 1}}`,
174+
`foo: [3]->{"after": {"a": 3}}`,
175+
`foo: [4]->{"after": {"a": 4}}`,
176+
`foo: [6]->{"after": {"a": 6}}`,
177+
})
178+
}
179+
180+
cdcTest(t, testFn, feedTestEnterpriseSinks)
181+
}

pkg/cmd/roachtest/tests/multi_store_remove.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ func runMultiStoreRemove(ctx context.Context, t test.Test, c cluster.Cluster) {
6060
startSettings := install.MakeClusterSettings()
6161
// Speed up the replicate queue.
6262
startSettings.Env = append(startSettings.Env, "COCKROACH_SCAN_INTERVAL=30s")
63+
64+
// Increase the verbosity of this test to help debug future failures.
65+
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
66+
"--vmodule=*=1,raft=3",
67+
)
68+
6369
c.Start(ctx, t.L(), startOpts, startSettings, c.Range(1, 3))
6470

6571
// Confirm that there are 6 stores live.

pkg/kv/kvserver/allocator/mmaprototype/allocator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type ChangeOptions struct {
2929
// - changes to this interface to make the integration for the new allocator
3030
// be less different than integration with the old allocator.
3131
type Allocator interface {
32-
LoadSummaryForAllStores() string
32+
LoadSummaryForAllStores(context.Context) string
3333
Metrics() *MMAMetrics
3434
// Methods to update the state of the external world. The allocator starts
3535
// with no knowledge.

0 commit comments

Comments
 (0)