Skip to content

Commit 3089029

Browse files
authored
Merge pull request #154249 from spilchen/backport25.4-153919
release-25.4: sql/inspect: add protected timestamp support to INSPECT
2 parents a66bb53 + 3a80b9a commit 3089029

File tree

7 files changed

+250
-1
lines changed

7 files changed

+250
-1
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,6 +1464,14 @@ message InspectDetails {
14641464

14651465
// AsOf specifies the timestamp at which the inspect checks should be performed.
14661466
util.hlc.Timestamp as_of = 2 [(gogoproto.nullable) = false];
1467+
1468+
// ProtectedTimestampRecord is the id of the protected timestamp record that
1469+
// prevents garbage collection of data at the AsOf timestamp. This is only
1470+
// set if a fixed timestamp was used for INSPECT (e.g. AsOf timestamp is non-empty).
1471+
bytes protected_timestamp_record = 3 [
1472+
(gogoproto.customname) = "ProtectedTimestampRecord",
1473+
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
1474+
];
14671475
}
14681476

14691477
message UpdateTableMetadataCacheDetails {}

pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {
5555
case jobspb.SchemaChangeDetails:
5656
v.ProtectedTimestampRecord = u
5757
return v
58+
case jobspb.InspectDetails:
59+
v.ProtectedTimestampRecord = u
60+
return v
5861
default:
5962
panic(errors.AssertionFailedf("not supported %T", details))
6063
}
@@ -68,6 +71,8 @@ func getProtectedTSOnJob(details jobspb.Details) *uuid.UUID {
6871
return v.ProtectedTimestampRecord
6972
case jobspb.SchemaChangeDetails:
7073
return v.ProtectedTimestampRecord
74+
case jobspb.InspectDetails:
75+
return v.ProtectedTimestampRecord
7176
default:
7277
panic("not supported")
7378
}

pkg/sql/exec_util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,6 +2101,9 @@ type InspectTestingKnobs struct {
21012101
// OnInspectJobStart is called just before the inspect job begins execution.
21022102
// If it returns an error, the job fails immediately.
21032103
OnInspectJobStart func() error
2104+
// OnInspectAfterProtectedTimestamp is called after the protected timestamp
2105+
// has been created (if applicable). If it returns an error, the job fails.
2106+
OnInspectAfterProtectedTimestamp func() error
21042107
// InspectIssueLogger is an override to the default issue logger.
21052108
InspectIssueLogger interface{}
21062109
}

pkg/sql/inspect/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
deps = [
1919
"//pkg/jobs",
2020
"//pkg/jobs/jobspb",
21+
"//pkg/kv/kvserver/protectedts/ptpb",
2122
"//pkg/roachpb",
2223
"//pkg/security/username",
2324
"//pkg/settings",
@@ -72,6 +73,7 @@ go_test(
7273
"//pkg/jobs/jobspb",
7374
"//pkg/keys",
7475
"//pkg/kv",
76+
"//pkg/kv/kvserver/protectedts",
7577
"//pkg/roachpb",
7678
"//pkg/security/securityassets",
7779
"//pkg/security/securitytest",
@@ -83,6 +85,7 @@ go_test(
8385
"//pkg/sql/catalog/desctestutils",
8486
"//pkg/sql/execinfra",
8587
"//pkg/sql/execinfrapb",
88+
"//pkg/sql/isql",
8689
"//pkg/sql/rowenc",
8790
"//pkg/sql/sem/tree",
8891
"//pkg/sql/sessiondata",

pkg/sql/inspect/index_consistency_check.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ func (c *indexConsistencyCheck) Start(
196196
)
197197

198198
// Wrap the query with AS OF SYSTEM TIME to ensure it uses the specified timestamp
199-
// TODO(148573): use a protected timestamp record for this timestamp.
200199
queryWithAsOf := fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", checkQuery, c.asOf.AsOfSystemTime())
201200

202201
// Store the query for error reporting

pkg/sql/inspect/inspect_job.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/jobs"
1212
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
1415
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1516
"github.com/cockroachdb/cockroach/pkg/sql"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1618
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1719
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
1820
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -21,6 +23,7 @@ import (
2123
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2224
"github.com/cockroachdb/cockroach/pkg/sql/types"
2325
"github.com/cockroachdb/cockroach/pkg/util/log"
26+
"github.com/cockroachdb/errors"
2427
)
2528

2629
type inspectResumer struct {
@@ -40,11 +43,24 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
4043
return err
4144
}
4245

46+
details := c.job.Details().(jobspb.InspectDetails)
47+
if len(details.Checks) == 0 {
48+
return nil
49+
}
50+
4351
pkSpans, err := c.getPrimaryIndexSpans(ctx, execCfg)
4452
if err != nil {
4553
return err
4654
}
4755

56+
if err := c.maybeProtectTimestamp(ctx, execCfg, details); err != nil {
57+
return err
58+
}
59+
60+
if err := c.maybeRunAfterProtectedTimestampHook(execCfg); err != nil {
61+
return err
62+
}
63+
4864
// TODO(149460): add a goroutine that will replan the job on topology changes
4965
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, pkSpans)
5066
if err != nil {
@@ -55,13 +71,18 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
5571
return err
5672
}
5773

74+
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
75+
5876
return c.markJobComplete(ctx)
5977
}
6078

6179
// OnFailOrCancel implements the Resumer interface
6280
func (c *inspectResumer) OnFailOrCancel(
6381
ctx context.Context, execCtx interface{}, jobErr error,
6482
) error {
83+
jobExecCtx := execCtx.(sql.JobExecContext)
84+
execCfg := jobExecCtx.ExecCfg()
85+
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
6586
return nil
6687
}
6788

@@ -77,6 +98,13 @@ func (c *inspectResumer) maybeRunOnJobStartHook(execCfg *sql.ExecutorConfig) err
7798
return execCfg.InspectTestingKnobs.OnInspectJobStart()
7899
}
79100

101+
func (c *inspectResumer) maybeRunAfterProtectedTimestampHook(execCfg *sql.ExecutorConfig) error {
102+
if execCfg.InspectTestingKnobs == nil || execCfg.InspectTestingKnobs.OnInspectAfterProtectedTimestamp == nil {
103+
return nil
104+
}
105+
return execCfg.InspectTestingKnobs.OnInspectAfterProtectedTimestamp()
106+
}
107+
80108
// getPrimaryIndexSpans returns the primary index spans for all tables involved in
81109
// the INSPECT job's checks.
82110
func (c *inspectResumer) getPrimaryIndexSpans(
@@ -202,6 +230,50 @@ func (c *inspectResumer) markJobComplete(ctx context.Context) error {
202230
)
203231
}
204232

233+
// maybeProtectTimestamp creates a protected timestamp record for the AsOf
234+
// timestamp to prevent garbage collection during the inspect operation.
235+
// If no AsOf timestamp is specified, this function does nothing.
236+
// The protection target includes all tables involved in the inspect checks.
237+
// Uses the jobsprotectedts.Manager to store the PTS ID in job details.
238+
func (c *inspectResumer) maybeProtectTimestamp(
239+
ctx context.Context, execCfg *sql.ExecutorConfig, details jobspb.InspectDetails,
240+
) error {
241+
// If we are not running a historical query, nothing to do here.
242+
if details.AsOf.IsEmpty() {
243+
return nil
244+
}
245+
246+
// Create a target for the specific tables involved in the inspect checks
247+
var tableIDSet catalog.DescriptorIDSet
248+
for _, check := range details.Checks {
249+
tableIDSet.Add(check.TableID)
250+
}
251+
target := ptpb.MakeSchemaObjectsTarget(tableIDSet.Ordered())
252+
253+
// Protect will store the PTS ID in job details.
254+
_, err := execCfg.ProtectedTimestampManager.Protect(ctx, c.job, target, details.AsOf)
255+
if err != nil {
256+
return errors.Wrapf(err, "failed to protect timestamp %s for INSPECT job %d", details.AsOf, c.job.ID())
257+
}
258+
259+
log.Dev.Infof(ctx, "protected timestamp created for INSPECT job %d at %s", c.job.ID(), details.AsOf)
260+
return nil
261+
}
262+
263+
// maybeCleanupProtectedTimestamp cleans up any protected timestamp record
264+
// associated with this job. If no protected timestamp was created, this
265+
// function does nothing.
266+
func (c *inspectResumer) maybeCleanupProtectedTimestamp(
267+
ctx context.Context, execCfg *sql.ExecutorConfig,
268+
) {
269+
details := c.job.Details().(jobspb.InspectDetails)
270+
if details.ProtectedTimestampRecord != nil {
271+
if err := execCfg.ProtectedTimestampManager.Unprotect(ctx, c.job); err != nil {
272+
log.Dev.Warningf(ctx, "failed to clean up protected timestamp: %v", err)
273+
}
274+
}
275+
}
276+
205277
func init() {
206278
createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
207279
return &inspectResumer{job: job}

pkg/sql/inspect/inspect_job_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"time"
1414

1515
"github.com/cockroachdb/cockroach/pkg/base"
16+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
1618
"github.com/cockroachdb/cockroach/pkg/sql"
19+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1720
"github.com/cockroachdb/cockroach/pkg/testutils"
1821
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1922
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
@@ -145,3 +148,159 @@ func TestInspectJobImplicitTxnSemantics(t *testing.T) {
145148
})
146149
}
147150
}
151+
152+
// TestInspectJobProtectedTimestamp verifies that INSPECT jobs properly create
153+
// and clean up protected timestamp records when using AS OF SYSTEM TIME.
154+
func TestInspectJobProtectedTimestamp(t *testing.T) {
155+
defer leaktest.AfterTest(t)()
156+
defer log.Scope(t).Close(t)
157+
158+
for _, tc := range []struct {
159+
desc string
160+
forceJobFailure bool
161+
expectedJobStatus string
162+
expectError bool
163+
}{
164+
{
165+
desc: "job success with cleanup",
166+
forceJobFailure: false,
167+
expectedJobStatus: "succeeded",
168+
expectError: false,
169+
},
170+
{
171+
desc: "job failure with cleanup",
172+
forceJobFailure: true,
173+
expectedJobStatus: "failed",
174+
expectError: true,
175+
},
176+
} {
177+
t.Run(tc.desc, func(t *testing.T) {
178+
var blockInspectExecution atomic.Bool
179+
var protectedTimestampCreated atomic.Bool
180+
181+
ctx := context.Background()
182+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
183+
Knobs: base.TestingKnobs{
184+
Inspect: &sql.InspectTestingKnobs{
185+
OnInspectAfterProtectedTimestamp: func() error {
186+
protectedTimestampCreated.Store(true)
187+
// Block execution until we've verified the protected timestamp
188+
for blockInspectExecution.Load() {
189+
time.Sleep(10 * time.Millisecond)
190+
}
191+
if tc.forceJobFailure {
192+
return errors.New("forced job failure for testing")
193+
}
194+
return nil
195+
},
196+
},
197+
},
198+
})
199+
defer s.Stopper().Stop(ctx)
200+
201+
runner := sqlutils.MakeSQLRunner(db)
202+
runner.Exec(t, `
203+
CREATE DATABASE db;
204+
SET enable_scrub_job = true;
205+
CREATE TABLE db.t (
206+
id INT PRIMARY KEY,
207+
val INT
208+
);
209+
CREATE INDEX i1 on db.t (val);
210+
INSERT INTO db.t VALUES (1, 2), (2, 3);`)
211+
212+
// Start blocking inspection execution
213+
blockInspectExecution.Store(true)
214+
215+
// Start INSPECT job with AS OF timestamp in a goroutine
216+
errCh := make(chan error, 1)
217+
go func() {
218+
_, err := db.Exec("EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1us'")
219+
errCh <- err
220+
}()
221+
222+
// Wait for the protected timestamp hook to be called
223+
testutils.SucceedsSoon(t, func() error {
224+
if !protectedTimestampCreated.Load() {
225+
return errors.New("protected timestamp hook not called yet")
226+
}
227+
return nil
228+
})
229+
230+
// Get the job ID
231+
var jobID int64
232+
runner.QueryRow(t, `
233+
SELECT id
234+
FROM crdb_internal.system_jobs
235+
WHERE job_type = 'INSPECT' AND status = 'running'
236+
ORDER BY created DESC
237+
LIMIT 1
238+
`).Scan(&jobID)
239+
240+
// Load the job and get protected timestamp record
241+
execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig)
242+
job, err := execCfg.JobRegistry.LoadJob(ctx, jobspb.JobID(jobID))
243+
require.NoError(t, err)
244+
245+
details := job.Details().(jobspb.InspectDetails)
246+
require.NotNil(t, details.ProtectedTimestampRecord, "protected timestamp record should be set")
247+
protectedTSID := *details.ProtectedTimestampRecord
248+
249+
// Check that the protected timestamp record actually exists in the system
250+
var recordExists bool
251+
require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
252+
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
253+
_, err := pts.GetRecord(ctx, protectedTSID)
254+
if err != nil {
255+
if errors.Is(err, protectedts.ErrNotExists) {
256+
recordExists = false
257+
return nil
258+
}
259+
return err
260+
}
261+
recordExists = true
262+
return nil
263+
}))
264+
require.True(t, recordExists, "protected timestamp record should exist in the system")
265+
266+
// Allow the job to complete
267+
blockInspectExecution.Store(false)
268+
269+
// Wait for job to complete
270+
select {
271+
case err := <-errCh:
272+
if tc.expectError {
273+
require.Error(t, err, "INSPECT job should fail due to forced error")
274+
} else {
275+
require.NoError(t, err, "INSPECT job should complete successfully")
276+
}
277+
case <-time.After(30 * time.Second):
278+
t.Fatal("INSPECT job did not complete within timeout")
279+
}
280+
281+
// Verify job status
282+
var status string
283+
runner.QueryRow(t, `
284+
SELECT status
285+
FROM crdb_internal.system_jobs
286+
WHERE id = $1
287+
`, jobID).Scan(&status)
288+
require.Equal(t, tc.expectedJobStatus, status, "job should have expected status")
289+
290+
// Verify protected timestamp record is cleaned up
291+
testutils.SucceedsSoon(t, func() error {
292+
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
293+
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
294+
_, err := pts.GetRecord(ctx, protectedTSID)
295+
if err != nil {
296+
if errors.Is(err, protectedts.ErrNotExists) {
297+
return nil // This is what we want
298+
}
299+
return err
300+
}
301+
return errors.New("protected timestamp record still exists")
302+
})
303+
})
304+
})
305+
}
306+
}

0 commit comments

Comments
 (0)