Skip to content

Commit f66151f

Browse files
craig[bot]hakuuwwmgartner
committed
143424: raft: use term cache for leader commit term check r=pav-kv a=hakuuww Previously, we avoided calling term(index) for leader commit term check by keeping the first entry index for the current leader term in memory, and comparing against that index. (see #137826) Now, since we have implemented term cache (which stores info of the current leader term's first entryID in this case), the term cache is used for that check to reduce the complexity of our code. (see #142239 ) Part of #136296 Fixes: #143362 Epic: None Release note: None 143463: sql: refactor PlanCDCExpression r=mgartner a=mgartner #### sql: collect CDC presentation outside of plan walker The column presentation of the top node is collected to determine the output columns for the CDC expression. There is no need to do this within the plan node walker, so it has been moved outside. Release note: None #### sql: use InputCount and Input planNode methods to walk CDC plans The `Input` and `InputCount` methods of the `planNode` interface are now used to walk plan node trees of CDC expressions. This continues the effort to deprecate and remove the plan node walkers (see #137620 for more details on the motivation for this). Epic: None Release note: None #### sql: refactor return statements in PlanCDCExpression The `return` statements for error cases now explicitly return an empty `CDCExpressionPlan` for clarity. Release note: None Co-authored-by: Anthony Xu <[email protected]> Co-authored-by: Marcus Gartner <[email protected]>
3 parents b801752 + eee2d2f + 518b3fa commit f66151f

File tree

3 files changed

+56
-55
lines changed

3 files changed

+56
-55
lines changed

pkg/raft/raft.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -335,16 +335,6 @@ type raft struct {
335335

336336
state pb.StateType
337337

338-
// idxPreLeading is the last log index as of when this node became the
339-
// leader. Separates entries proposed by previous leaders from the entries
340-
// proposed by the current leader. Used only in StateLeader, and updated
341-
// when entering StateLeader (in becomeLeader()).
342-
//
343-
// Invariants (when in StateLeader at raft.Term):
344-
// - entries at indices <= idxPreLeading have term < raft.Term
345-
// - entries at indices > idxPreLeading have term == raft.Term
346-
idxPreLeading uint64
347-
348338
// isLearner is true if the local raft node is a learner.
349339
isLearner bool
350340

@@ -1047,7 +1037,9 @@ func (r *raft) maybeCommit() bool {
10471037
// This comparison is equivalent in output to:
10481038
// if !r.raftLog.matchTerm(entryID{term: r.Term, index: index})
10491039
// But avoids (potentially) loading the entry term from storage.
1050-
if index <= r.idxPreLeading {
1040+
// termCache.last() stores the first entryID added to raftLog in the
1041+
// current leader term by invariants.
1042+
if index < r.raftLog.termCache.last().index {
10511043
return false
10521044
}
10531045

@@ -1386,11 +1378,6 @@ func (r *raft) becomeLeader() {
13861378
// could be expensive.
13871379
r.pendingConfIndex = r.raftLog.lastIndex()
13881380

1389-
// Remember the last log index before the term advances to
1390-
// our current (leader) term.
1391-
// See the idxPreLeading comment for more details.
1392-
r.idxPreLeading = r.raftLog.lastIndex()
1393-
13941381
emptyEnt := pb.Entry{Data: nil}
13951382
if !r.appendEntry(emptyEnt) {
13961383
// This won't happen because we just called reset() above.

pkg/raft/raft_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2592,26 +2592,34 @@ func TestLeaderAppResp(t *testing.T) {
25922592
// Follower 2 responds to leader, indicating log index 2 is replicated.
25932593
// Leader tries to commit, but commit index doesn't advance since the index
25942594
// is from a previous term.
2595-
// We hit maybeCommit() and do term check comparison by using the invariant
2596-
// raft.idxPreLeading.
2595+
// We hit maybeCommit() and do term check comparison by using the
2596+
// last "term flip" entryID stored in the termCache.
25972597
// There is no storage access for term in the maybeCommit() code path
25982598
{2, false, 2, 7, 1, 2, 0, 2},
25992599

2600+
// Follower 2 responds to leader, indicating log index 3 is replicated.
2601+
// Leader tries to commit, but commit index doesn't advance since the index
2602+
// is from a previous term. Same as above.
2603+
{3, false, 3, 7, 1, 3, 0, 1},
2604+
26002605
// NB: For the following tests, we are skipping the MsgAppResp for the first
26012606
// 3 entries, by directly processing MsgAppResp for later entries.
26022607
//
26032608
// Follower 2 is StateProbing at 4, it sends MsgAppResp for 4, and is moved
26042609
// to StateReplicate and as many entries as possible are sent to it (5, 6).
26052610
// Correspondingly the Next is then 7 (entry 7 does not exist, indicating
26062611
// the follower will be up to date should it process the emitted MsgApp).
2607-
// accept resp; leader commits; respond with commit index
2612+
// accept resp; leader commits; respond with commit index.
2613+
// maybeCommit() is successful.
26082614
{4, false, 4, 7, 1, 4, 4, 1},
26092615

26102616
// Follower 2 says term2, index5 is already replicated.
26112617
// The leader responds with the updated commit index to follower 2.
2618+
// maybeCommit() is successful.
26122619
{5, false, 5, 7, 1, 5, 5, 1},
26132620
// Follower 2 says term2, index6 is already replicated.
26142621
// The leader responds with the updated commit index to follower 2.
2622+
// maybeCommit() is successful.
26152623
{6, false, 6, 7, 1, 6, 6, 1},
26162624
} {
26172625
t.Run("", func(t *testing.T) {

pkg/sql/distsql_plan_changefeed.go

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func PlanCDCExpression(
9494

9595
familyID, err := extractFamilyID(cdcExpr)
9696
if err != nil {
97-
return cdcPlan, err
97+
return CDCExpressionPlan{}, err
9898
}
9999

100100
cdcCat := &cdcOptCatalog{
@@ -110,7 +110,7 @@ func PlanCDCExpression(
110110

111111
memo, err := opc.buildExecMemo(ctx)
112112
if err != nil {
113-
return cdcPlan, err
113+
return CDCExpressionPlan{}, err
114114
}
115115
if log.V(2) {
116116
log.Infof(ctx, "Optimized CDC expression: %s", memo)
@@ -122,55 +122,61 @@ func PlanCDCExpression(
122122
ctx, &p.curPlan, &p.stmt, newExecFactory(ctx, p), memo, p.SemaCtx(),
123123
p.EvalContext(), allowAutoCommit, disableTelemetryAndPlanGists,
124124
); err != nil {
125-
return cdcPlan, err
125+
return CDCExpressionPlan{}, err
126+
}
127+
128+
// The top node contains the list of columns to return.
129+
presentation := planColumns(p.curPlan.main.planNode)
130+
if len(presentation) == 0 {
131+
return CDCExpressionPlan{}, errors.AssertionFailedf("unable to determine result columns")
126132
}
127133

128134
// Walk the plan, perform sanity checks and extract information we need.
129135
var spans roachpb.Spans
130-
var presentation colinfo.ResultColumns
131-
132-
if err := walkPlan(ctx, p.curPlan.main.planNode, planObserver{
133-
enterNode: func(ctx context.Context, nodeName string, plan planNode) (bool, error) {
134-
switch n := plan.(type) {
135-
case *scanNode:
136-
// Collect spans we wanted to scan. The select statement used for this
137-
// plan should result in a single table scan of primary index span.
138-
if len(spans) > 0 {
139-
return false, errors.AssertionFailedf("unexpected multiple primary index scan operations")
136+
var validatePlanAndCollectSpans func(p planNode) error
137+
validatePlanAndCollectSpans = func(p planNode) error {
138+
switch n := p.(type) {
139+
case *scanNode:
140+
// Collect spans we wanted to scan. The select statement used for
141+
// this plan should result in a single table scan of primary index
142+
// span.
143+
if len(spans) > 0 {
144+
return errors.AssertionFailedf("unexpected multiple primary index scan operations")
145+
}
146+
if n.index.GetID() != n.desc.GetPrimaryIndexID() {
147+
return errors.AssertionFailedf(
148+
"expect scan of primary index, found scan of %d", n.index.GetID())
149+
}
150+
spans = n.spans
151+
case *zeroNode:
152+
return errors.Newf(
153+
"changefeed expression %s does not match any rows", tree.AsString(cdcExpr))
154+
default:
155+
// Recurse into input nodes.
156+
for i, n := 0, p.InputCount(); i < n; i++ {
157+
input, err := p.Input(i)
158+
if err != nil {
159+
return err
140160
}
141-
if n.index.GetID() != n.desc.GetPrimaryIndexID() {
142-
return false, errors.AssertionFailedf(
143-
"expect scan of primary index, found scan of %d", n.index.GetID())
161+
if err = validatePlanAndCollectSpans(input); err != nil {
162+
return err
144163
}
145-
spans = n.spans
146-
case *zeroNode:
147-
return false, errors.Newf(
148-
"changefeed expression %s does not match any rows", tree.AsString(cdcExpr))
149164
}
150-
151-
// Because the walk is top down, the top node is the node containing the
152-
// list of columns to return.
153-
if len(presentation) == 0 {
154-
presentation = planColumns(plan)
155-
}
156-
return true, nil
157-
},
158-
}); err != nil {
159-
return cdcPlan, err
165+
}
166+
return nil
167+
}
168+
if err := validatePlanAndCollectSpans(p.curPlan.main.planNode); err != nil {
169+
return CDCExpressionPlan{}, err
160170
}
161171

162172
if len(spans) == 0 {
163173
// Should have been handled by the zeroNode check above.
164-
return cdcPlan, errors.AssertionFailedf("expected at least 1 span to scan")
165-
}
166-
167-
if len(presentation) == 0 {
168-
return cdcPlan, errors.AssertionFailedf("unable to determine result columns")
174+
return CDCExpressionPlan{}, errors.AssertionFailedf("expected at least 1 span to scan")
169175
}
170176

171177
if len(p.curPlan.subqueryPlans) > 0 || len(p.curPlan.cascades) > 0 ||
172178
len(p.curPlan.checkPlans) > 0 || len(p.curPlan.triggers) > 0 {
173-
return cdcPlan, errors.AssertionFailedf("unexpected query structure")
179+
return CDCExpressionPlan{}, errors.AssertionFailedf("unexpected query structure")
174180
}
175181

176182
planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, LocalDistribution)

0 commit comments

Comments
 (0)