Skip to content

Commit 4d7f0f3

Browse files
committed
changefeedccl: restore persisted span frontier from job_info
Changefeeds will now restore their persisted span frontiers on restart. Release note: None
1 parent 378e7c3 commit 4d7f0f3

File tree

4 files changed

+131
-1
lines changed

4 files changed

+131
-1
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1616
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
1717
"github.com/cockroachdb/cockroach/pkg/clusterversion"
18+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1819
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1920
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
2021
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -470,6 +471,22 @@ func makePlan(
470471
}
471472
}
472473

474+
var resolvedSpans []jobspb.ResolvedSpan
475+
if jobID != 0 {
476+
if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
477+
spans, ok, err := jobfrontier.GetAllResolvedSpans(ctx, txn, jobID)
478+
if err != nil {
479+
return err
480+
}
481+
if ok {
482+
resolvedSpans = spans
483+
}
484+
return nil
485+
}); err != nil {
486+
return nil, nil, err
487+
}
488+
}
489+
473490
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
474491
for i, sp := range spanPartitions {
475492
if log.ExpensiveLogEnabled(ctx, 2) {
@@ -493,6 +510,7 @@ func makePlan(
493510
Select: execinfrapb.Expression{Expr: details.Select},
494511
Description: description,
495512
ProgressConfig: progressConfig,
513+
ResolvedSpans: resolvedSpans,
496514
}
497515
}
498516

@@ -508,6 +526,7 @@ func makePlan(
508526
UserProto: execCtx.User().EncodeProto(),
509527
Description: description,
510528
ProgressConfig: progressConfig,
529+
ResolvedSpans: resolvedSpans,
511530
}
512531

513532
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,12 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
656656
return nil, errors.Wrapf(err, "failed to restore span-level checkpoint")
657657
}
658658

659+
for _, rs := range ca.spec.ResolvedSpans {
660+
if _, err := ca.frontier.Forward(rs.Span, rs.Timestamp); err != nil {
661+
return nil, errors.Wrapf(err, "failed to restore frontier")
662+
}
663+
}
664+
659665
return spans, nil
660666
}
661667

@@ -1480,6 +1486,15 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14801486
return
14811487
}
14821488

1489+
for _, rs := range cf.spec.ResolvedSpans {
1490+
if _, err := cf.frontier.Forward(rs.Span, rs.Timestamp); err != nil {
1491+
log.Changefeed.Warningf(cf.Ctx(),
1492+
"moving to draining due to error restoring frontier: %v", err)
1493+
cf.MoveToDraining(err)
1494+
return
1495+
}
1496+
}
1497+
14831498
if cf.knobs.AfterCoordinatorFrontierRestore != nil {
14841499
cf.knobs.AfterCoordinatorFrontierRestore(cf.frontier)
14851500
}

pkg/ccl/changefeedccl/changefeed_progress_test.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@ import (
1111
"time"
1212

1313
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1415
"github.com/cockroachdb/cockroach/pkg/jobs"
1516
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1617
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
18+
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
1720
"github.com/cockroachdb/cockroach/pkg/sql/isql"
21+
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
1822
"github.com/cockroachdb/cockroach/pkg/testutils"
1923
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
24+
"github.com/cockroachdb/cockroach/pkg/util/encoding"
2025
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2126
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/cockroach/pkg/util/span"
2228
"github.com/cockroachdb/errors"
2329
"github.com/stretchr/testify/require"
2430
)
@@ -54,9 +60,9 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
5460

5561
// Make sure frontier gets persisted to job_info table.
5662
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
63+
var allSpans []jobspb.ResolvedSpan
5764
testutils.SucceedsSoon(t, func() error {
5865
var found bool
59-
var allSpans []jobspb.ResolvedSpan
6066
if err := s.Server.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
6167
var err error
6268
allSpans, found, err = jobfrontier.GetAllResolvedSpans(ctx, txn, jobID)
@@ -74,6 +80,17 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
7480
return nil
7581
})
7682

83+
// Make sure the persisted spans cover the entire table.
84+
fooTableSpan := desctestutils.
85+
TestingGetPublicTableDescriptor(s.Server.DB(), s.Codec, "d", "foo").
86+
PrimaryIndexSpan(s.Codec)
87+
var spanGroup roachpb.SpanGroup
88+
spanGroup.Add(fooTableSpan)
89+
for _, rs := range allSpans {
90+
spanGroup.Sub(rs.Span)
91+
}
92+
require.Zero(t, spanGroup.Len())
93+
7794
// Verify metric count and average latency have sensible values.
7895
testutils.SucceedsSoon(t, func() error {
7996
metricSnapshot := metric.CumulativeSnapshot()
@@ -91,3 +108,74 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
91108

92109
cdcTest(t, testFn, feedTestEnterpriseSinks)
93110
}
111+
112+
// TestChangefeedFrontierRestore verifies that changefeeds will correctly
113+
// restore progress from persisted span frontiers.
114+
func TestChangefeedFrontierRestore(t *testing.T) {
115+
defer leaktest.AfterTest(t)()
116+
defer log.Scope(t).Close(t)
117+
118+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
119+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
120+
ctx := context.Background()
121+
122+
// Disable span-level checkpointing.
123+
changefeedbase.SpanCheckpointInterval.Override(ctx, &s.Server.ClusterSettings().SV, 0)
124+
125+
// Create a table and a changefeed on it.
126+
sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
127+
foo := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no'")
128+
defer closeFeed(t, foo)
129+
jobFeed := foo.(cdctest.EnterpriseTestFeed)
130+
131+
// Pause the changefeed.
132+
require.NoError(t, jobFeed.Pause())
133+
134+
// Insert a few rows into the table and save the insert time.
135+
var tsStr string
136+
sqlDB.QueryRow(t, `INSERT INTO foo VALUES (1), (2), (3), (4), (5), (6)
137+
RETURNING cluster_logical_timestamp()`).Scan(&tsStr)
138+
ts := parseTimeToHLC(t, tsStr)
139+
140+
// Make function to create spans for single rows in the table.
141+
codec := s.Server.Codec()
142+
fooDesc := desctestutils.TestingGetPublicTableDescriptor(s.Server.DB(), codec, "d", "foo")
143+
rowSpan := func(key int64) roachpb.Span {
144+
keyPrefix := func() []byte {
145+
return rowenc.MakeIndexKeyPrefix(codec, fooDesc.GetID(), fooDesc.GetPrimaryIndexID())
146+
}
147+
return roachpb.Span{
148+
Key: encoding.EncodeVarintAscending(keyPrefix(), key),
149+
EndKey: encoding.EncodeVarintAscending(keyPrefix(), key+1),
150+
}
151+
}
152+
153+
// Manually persist a span frontier that manually marks some of the
154+
// inserted rows as resolved already.
155+
hw, err := jobFeed.HighWaterMark()
156+
require.NoError(t, err)
157+
frontier, err := span.MakeFrontierAt(hw, fooDesc.PrimaryIndexSpan(codec))
158+
require.NoError(t, err)
159+
for _, id := range []int64{2, 5} {
160+
_, err := frontier.Forward(rowSpan(id), ts)
161+
require.NoError(t, err)
162+
}
163+
err = s.Server.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
164+
return jobfrontier.Store(ctx, txn, jobFeed.JobID(), "test frontier", frontier)
165+
})
166+
require.NoError(t, err)
167+
168+
// Resume the changefeed.
169+
require.NoError(t, jobFeed.Resume())
170+
171+
// We should receive rows 1, 3, 4, 6 (rows 2 and 5 were marked as resolved).
172+
assertPayloads(t, foo, []string{
173+
`foo: [1]->{"after": {"a": 1}}`,
174+
`foo: [3]->{"after": {"a": 3}}`,
175+
`foo: [4]->{"after": {"a": 4}}`,
176+
`foo: [6]->{"after": {"a": 6}}`,
177+
})
178+
}
179+
180+
cdcTest(t, testFn, feedTestEnterpriseSinks)
181+
}

pkg/sql/execinfrapb/processors_changefeeds.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ message ChangeAggregatorSpec {
6464

6565
// ProgressConfig is the configuration for the changefeed's progress tracking.
6666
optional ChangefeedProgressConfig progress_config = 10;
67+
68+
// ResolvedSpans contains the resolved spans that should be restored
69+
// when the changefeed resumes.
70+
repeated cockroach.sql.jobs.jobspb.ResolvedSpan resolved_spans = 11 [(gogoproto.nullable) = false];
6771
}
6872

6973
// ChangeFrontierSpec is the specification for a processor that receives
@@ -98,6 +102,10 @@ message ChangeFrontierSpec {
98102

99103
// ProgressConfig is the configuration for the changefeed's progress tracking.
100104
optional ChangefeedProgressConfig progress_config = 7;
105+
106+
// ResolvedSpans contains the resolved spans that should be restored
107+
// when the changefeed resumes.
108+
repeated cockroach.sql.jobs.jobspb.ResolvedSpan resolved_spans = 8 [(gogoproto.nullable) = false];
101109
}
102110

103111
// ChangefeedProgressConfig is the configuration for the changefeed's progress tracking.

0 commit comments

Comments
 (0)