Skip to content

Commit a8886b4

Browse files
craig[bot]spilchen
andcommitted
Merge #151243
151243: sql/ttl: refactor progress tracking behind interface for future checkpointing r=spilchen a=spilchen This change introduces a progressTracker interface to abstract how TTL job progress is tracked. The existing behavior is now encapsulated in a legacyProgressTracker implementation. While TTL jobs could previously resume after interruption, they would always start from the beginning of the table, effectively discarding any partial progress. This refactor prepares for future support of span-level progress tracking, enabling jobs to resume from where they left off. A follow-up change will add actual checkpointing support using this interface, allowing resumable TTL jobs to skip spans already processed. This commit is scoped to the interface definition and refactoring of the current implementation under it. Informs #140514 Release note: none Co-authored-by: Matt Spilchen <[email protected]>
2 parents 29d7b10 + 91bad76 commit a8886b4

File tree

5 files changed

+417
-299
lines changed

5 files changed

+417
-299
lines changed

pkg/sql/ttl/ttljob/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "ttljob",
55
srcs = [
6+
"progress.go",
67
"ttljob.go",
78
"ttljob_metrics.go",
89
"ttljob_processor.go",
@@ -66,6 +67,7 @@ go_test(
6667
size = "large",
6768
srcs = [
6869
"main_test.go",
70+
"progress_test.go",
6971
"ttljob_internal_test.go",
7072
"ttljob_plans_test.go",
7173
"ttljob_processor_internal_test.go",

pkg/sql/ttl/ttljob/progress.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package ttljob
7+
8+
import (
9+
"context"
10+
"math/rand"
11+
"time"
12+
13+
"github.com/cockroachdb/cockroach/pkg/jobs"
14+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
15+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
16+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
17+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
18+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
19+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
20+
"github.com/cockroachdb/errors"
21+
pbtypes "github.com/gogo/protobuf/types"
22+
)
23+
24+
type progressTracker interface {
25+
// initJobProgress writes the initial job progress to the job record with the correct
26+
// span count determined after planning. This should be called after makePlan.
27+
initJobProgress(ctx context.Context, jobSpanCount int64) error
28+
// handleProgressUpdate handles incoming processor metadata and performs any necessary
29+
// job updates. Each implementation determines how to handle the update (immediate vs deferred).
30+
handleProgressUpdate(ctx context.Context, meta *execinfrapb.ProducerMetadata) error
31+
}
32+
33+
// legacyProgressTracker tracks TTL job progress without span checkpointing.
34+
// Progress is computed by counting processed spans against the total span count.
35+
// Job restarts reset progress to 0%. Updates are gated to occur approximately
36+
// every 1% of spans processed and at least 60 seconds apart.
37+
type legacyProgressTracker struct {
38+
job *jobs.Job
39+
40+
mu struct {
41+
syncutil.Mutex
42+
// lastUpdateTime is the wall time of the last job progress update.
43+
// Used to gate how often we persist job progress in refreshJobProgress.
44+
lastUpdateTime time.Time
45+
// lastSpanCount is the number of spans processed as of the last persisted update.
46+
lastSpanCount int64
47+
// updateEvery determines how many spans must be processed before we persist a new update.
48+
updateEvery int64
49+
// updateEveryDuration is the minimum time that must pass before allowing another progress update.
50+
updateEveryDuration time.Duration
51+
}
52+
}
53+
54+
var _ progressTracker = (*legacyProgressTracker)(nil)
55+
56+
func newLegacyProgressTracker(job *jobs.Job) *legacyProgressTracker {
57+
return &legacyProgressTracker{
58+
job: job,
59+
}
60+
}
61+
62+
// setupUpdateFrequency configures the progress update frequency settings.
63+
// To avoid too many progress updates, especially if a lot of the spans don't
64+
// have expired rows, we will gate the updates to approximately every 1% of
65+
// spans processed, and at least 60 seconds apart with jitter.
66+
func (t *legacyProgressTracker) setupUpdateFrequency(jobSpanCount int64) {
67+
t.mu.Lock()
68+
defer t.mu.Unlock()
69+
t.mu.updateEvery = max(1, jobSpanCount/100)
70+
t.mu.updateEveryDuration = 60*time.Second + time.Duration(rand.Int63n(10*1000))*time.Millisecond
71+
t.mu.lastUpdateTime = timeutil.Now()
72+
t.mu.lastSpanCount = 0
73+
}
74+
75+
// initJobProgress implements the progressTracker interface.
76+
func (t *legacyProgressTracker) initJobProgress(ctx context.Context, jobSpanCount int64) error {
77+
t.setupUpdateFrequency(jobSpanCount)
78+
79+
// Write initial progress to job record
80+
return t.job.NoTxn().Update(ctx, func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
81+
rowLevelTTL := &jobspb.RowLevelTTLProgress{
82+
JobTotalSpanCount: jobSpanCount,
83+
JobProcessedSpanCount: 0,
84+
}
85+
86+
progress := &jobspb.Progress{
87+
Details: &jobspb.Progress_RowLevelTTL{RowLevelTTL: rowLevelTTL},
88+
Progress: &jobspb.Progress_FractionCompleted{
89+
FractionCompleted: 0,
90+
},
91+
}
92+
ju.UpdateProgress(progress)
93+
return nil
94+
})
95+
}
96+
97+
// refreshJobProgress computes updated job progress from processor metadata.
98+
// It may return nil to skip immediate persistence, or a Progress to trigger an update.
99+
func (t *legacyProgressTracker) refreshJobProgress(
100+
_ context.Context, md *jobs.JobMetadata, meta *execinfrapb.ProducerMetadata,
101+
) (*jobspb.Progress, error) {
102+
if meta.BulkProcessorProgress == nil {
103+
return nil, nil
104+
}
105+
var incomingProcProgress jobspb.RowLevelTTLProcessorProgress
106+
if err := pbtypes.UnmarshalAny(&meta.BulkProcessorProgress.ProgressDetails, &incomingProcProgress); err != nil {
107+
return nil, errors.Wrapf(err, "unable to unmarshal ttl progress details")
108+
}
109+
110+
orig := md.Progress.GetRowLevelTTL()
111+
if orig == nil {
112+
return nil, errors.New("job progress does not contain RowLevelTTL details")
113+
}
114+
rowLevelTTL := protoutil.Clone(orig).(*jobspb.RowLevelTTLProgress)
115+
116+
// Update or insert the incoming processor progress.
117+
foundMatchingProcessor := false
118+
for i := range rowLevelTTL.ProcessorProgresses {
119+
if rowLevelTTL.ProcessorProgresses[i].ProcessorID == incomingProcProgress.ProcessorID {
120+
rowLevelTTL.ProcessorProgresses[i] = incomingProcProgress
121+
foundMatchingProcessor = true
122+
break
123+
}
124+
}
125+
if !foundMatchingProcessor {
126+
rowLevelTTL.ProcessorProgresses = append(rowLevelTTL.ProcessorProgresses, incomingProcProgress)
127+
}
128+
129+
// Recompute job level counters from scratch.
130+
rowLevelTTL.JobDeletedRowCount = 0
131+
rowLevelTTL.JobProcessedSpanCount = 0
132+
totalSpanCount := int64(0)
133+
for i := range rowLevelTTL.ProcessorProgresses {
134+
pp := &rowLevelTTL.ProcessorProgresses[i]
135+
rowLevelTTL.JobDeletedRowCount += pp.DeletedRowCount
136+
rowLevelTTL.JobProcessedSpanCount += pp.ProcessedSpanCount
137+
totalSpanCount += pp.TotalSpanCount
138+
}
139+
140+
if totalSpanCount > rowLevelTTL.JobTotalSpanCount {
141+
return nil, errors.Errorf(
142+
"computed span total cannot exceed job total: computed=%d jobRecorded=%d",
143+
totalSpanCount, rowLevelTTL.JobTotalSpanCount)
144+
}
145+
146+
// Avoid the update if doing this too frequently.
147+
t.mu.Lock()
148+
defer t.mu.Unlock()
149+
processedDelta := rowLevelTTL.JobProcessedSpanCount - t.mu.lastSpanCount
150+
processorComplete := incomingProcProgress.ProcessedSpanCount == incomingProcProgress.TotalSpanCount
151+
firstProgressForProcessor := !foundMatchingProcessor
152+
153+
if !(processedDelta >= t.mu.updateEvery ||
154+
timeutil.Since(t.mu.lastUpdateTime) >= t.mu.updateEveryDuration ||
155+
processorComplete ||
156+
firstProgressForProcessor) {
157+
return nil, nil // Skip the update
158+
}
159+
t.mu.lastSpanCount = rowLevelTTL.JobProcessedSpanCount
160+
t.mu.lastUpdateTime = timeutil.Now()
161+
162+
newProgress := &jobspb.Progress{
163+
Details: &jobspb.Progress_RowLevelTTL{
164+
RowLevelTTL: rowLevelTTL,
165+
},
166+
Progress: &jobspb.Progress_FractionCompleted{
167+
FractionCompleted: float32(rowLevelTTL.JobProcessedSpanCount) /
168+
float32(rowLevelTTL.JobTotalSpanCount),
169+
},
170+
}
171+
return newProgress, nil
172+
}
173+
174+
// handleProgressUpdate implements the progressTracker interface.
175+
func (t *legacyProgressTracker) handleProgressUpdate(
176+
ctx context.Context, meta *execinfrapb.ProducerMetadata,
177+
) error {
178+
return t.job.NoTxn().Update(ctx, func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
179+
progress, err := t.refreshJobProgress(ctx, &md, meta)
180+
if err != nil {
181+
return err
182+
}
183+
if progress != nil {
184+
ju.UpdateProgress(progress)
185+
}
186+
return nil
187+
})
188+
}

0 commit comments

Comments
 (0)