Skip to content

Commit 63d659a

Browse files
committed
sql/inspect: add protected timestamp support to INSPECT
This change adds protected timestamp record management to INSPECT jobs when using AS OF SYSTEM TIME clauses. Protected timestamps prevent garbage collection of historical data during the inspection process. When an INSPECT job specifies an AS OF timestamp, the job now: - Creates a protected timestamp record before beginning inspection - Stores the record ID in the job details for tracking - Automatically cleans up the record on job completion or failure Note: when we add support for running INSPECT without a fixed timestamp (in #148675), PTS won't be acquired then. Closes #148573 Release note: none Epic: CRDB-30356
1 parent 1345a57 commit 63d659a

File tree

7 files changed

+250
-1
lines changed

7 files changed

+250
-1
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,14 @@ message InspectDetails {
14751475

14761476
// AsOf specifies the timestamp at which the inspect checks should be performed.
14771477
util.hlc.Timestamp as_of = 2 [(gogoproto.nullable) = false];
1478+
1479+
// ProtectedTimestampRecord is the id of the protected timestamp record that
1480+
// prevents garbage collection of data at the AsOf timestamp. This is only
1481+
// set if a fixed timestamp was used for INSPECT (e.g. AsOf timestamp is non-empty).
1482+
bytes protected_timestamp_record = 3 [
1483+
(gogoproto.customname) = "ProtectedTimestampRecord",
1484+
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
1485+
];
14781486
}
14791487

14801488
message UpdateTableMetadataCacheDetails {}

pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {
5555
case jobspb.SchemaChangeDetails:
5656
v.ProtectedTimestampRecord = u
5757
return v
58+
case jobspb.InspectDetails:
59+
v.ProtectedTimestampRecord = u
60+
return v
5861
default:
5962
panic(errors.AssertionFailedf("not supported %T", details))
6063
}
@@ -68,6 +71,8 @@ func getProtectedTSOnJob(details jobspb.Details) *uuid.UUID {
6871
return v.ProtectedTimestampRecord
6972
case jobspb.SchemaChangeDetails:
7073
return v.ProtectedTimestampRecord
74+
case jobspb.InspectDetails:
75+
return v.ProtectedTimestampRecord
7176
default:
7277
panic("not supported")
7378
}

pkg/sql/exec_util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,6 +2101,9 @@ type InspectTestingKnobs struct {
21012101
// OnInspectJobStart is called just before the inspect job begins execution.
21022102
// If it returns an error, the job fails immediately.
21032103
OnInspectJobStart func() error
2104+
// OnInspectAfterProtectedTimestamp is called after the protected timestamp
2105+
// has been created (if applicable). If it returns an error, the job fails.
2106+
OnInspectAfterProtectedTimestamp func() error
21042107
// InspectIssueLogger is an override to the default issue logger.
21052108
InspectIssueLogger interface{}
21062109
}

pkg/sql/inspect/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
deps = [
1919
"//pkg/jobs",
2020
"//pkg/jobs/jobspb",
21+
"//pkg/kv/kvserver/protectedts/ptpb",
2122
"//pkg/roachpb",
2223
"//pkg/security/username",
2324
"//pkg/settings",
@@ -72,6 +73,7 @@ go_test(
7273
"//pkg/jobs/jobspb",
7374
"//pkg/keys",
7475
"//pkg/kv",
76+
"//pkg/kv/kvserver/protectedts",
7577
"//pkg/roachpb",
7678
"//pkg/security/securityassets",
7779
"//pkg/security/securitytest",
@@ -83,6 +85,7 @@ go_test(
8385
"//pkg/sql/catalog/desctestutils",
8486
"//pkg/sql/execinfra",
8587
"//pkg/sql/execinfrapb",
88+
"//pkg/sql/isql",
8689
"//pkg/sql/rowenc",
8790
"//pkg/sql/sem/tree",
8891
"//pkg/sql/sessiondata",

pkg/sql/inspect/index_consistency_check.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ func (c *indexConsistencyCheck) Start(
196196
)
197197

198198
// Wrap the query with AS OF SYSTEM TIME to ensure it uses the specified timestamp
199-
// TODO(148573): use a protected timestamp record for this timestamp.
200199
queryWithAsOf := fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", checkQuery, c.asOf.AsOfSystemTime())
201200

202201
// Store the query for error reporting

pkg/sql/inspect/inspect_job.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/jobs"
1212
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
1415
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1516
"github.com/cockroachdb/cockroach/pkg/sql"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1618
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1719
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
1820
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -21,6 +23,7 @@ import (
2123
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2224
"github.com/cockroachdb/cockroach/pkg/sql/types"
2325
"github.com/cockroachdb/cockroach/pkg/util/log"
26+
"github.com/cockroachdb/errors"
2427
)
2528

2629
type inspectResumer struct {
@@ -40,11 +43,24 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
4043
return err
4144
}
4245

46+
details := c.job.Details().(jobspb.InspectDetails)
47+
if len(details.Checks) == 0 {
48+
return nil
49+
}
50+
4351
pkSpans, err := c.getPrimaryIndexSpans(ctx, execCfg)
4452
if err != nil {
4553
return err
4654
}
4755

56+
if err := c.maybeProtectTimestamp(ctx, execCfg, details); err != nil {
57+
return err
58+
}
59+
60+
if err := c.maybeRunAfterProtectedTimestampHook(execCfg); err != nil {
61+
return err
62+
}
63+
4864
// TODO(149460): add a goroutine that will replan the job on topology changes
4965
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, pkSpans)
5066
if err != nil {
@@ -55,13 +71,18 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
5571
return err
5672
}
5773

74+
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
75+
5876
return c.markJobComplete(ctx)
5977
}
6078

6179
// OnFailOrCancel implements the Resumer interface
6280
func (c *inspectResumer) OnFailOrCancel(
6381
ctx context.Context, execCtx interface{}, jobErr error,
6482
) error {
83+
jobExecCtx := execCtx.(sql.JobExecContext)
84+
execCfg := jobExecCtx.ExecCfg()
85+
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
6586
return nil
6687
}
6788

@@ -77,6 +98,13 @@ func (c *inspectResumer) maybeRunOnJobStartHook(execCfg *sql.ExecutorConfig) err
7798
return execCfg.InspectTestingKnobs.OnInspectJobStart()
7899
}
79100

101+
func (c *inspectResumer) maybeRunAfterProtectedTimestampHook(execCfg *sql.ExecutorConfig) error {
102+
if execCfg.InspectTestingKnobs == nil || execCfg.InspectTestingKnobs.OnInspectAfterProtectedTimestamp == nil {
103+
return nil
104+
}
105+
return execCfg.InspectTestingKnobs.OnInspectAfterProtectedTimestamp()
106+
}
107+
80108
// getPrimaryIndexSpans returns the primary index spans for all tables involved in
81109
// the INSPECT job's checks.
82110
func (c *inspectResumer) getPrimaryIndexSpans(
@@ -202,6 +230,50 @@ func (c *inspectResumer) markJobComplete(ctx context.Context) error {
202230
)
203231
}
204232

233+
// maybeProtectTimestamp creates a protected timestamp record for the AsOf
234+
// timestamp to prevent garbage collection during the inspect operation.
235+
// If no AsOf timestamp is specified, this function does nothing.
236+
// The protection target includes all tables involved in the inspect checks.
237+
// Uses the jobsprotectedts.Manager to store the PTS ID in job details.
238+
func (c *inspectResumer) maybeProtectTimestamp(
239+
ctx context.Context, execCfg *sql.ExecutorConfig, details jobspb.InspectDetails,
240+
) error {
241+
// If we are not running a historical query, nothing to do here.
242+
if details.AsOf.IsEmpty() {
243+
return nil
244+
}
245+
246+
// Create a target for the specific tables involved in the inspect checks
247+
var tableIDSet catalog.DescriptorIDSet
248+
for _, check := range details.Checks {
249+
tableIDSet.Add(check.TableID)
250+
}
251+
target := ptpb.MakeSchemaObjectsTarget(tableIDSet.Ordered())
252+
253+
// Protect will store the PTS ID in job details.
254+
_, err := execCfg.ProtectedTimestampManager.Protect(ctx, c.job, target, details.AsOf)
255+
if err != nil {
256+
return errors.Wrapf(err, "failed to protect timestamp %s for INSPECT job %d", details.AsOf, c.job.ID())
257+
}
258+
259+
log.Dev.Infof(ctx, "protected timestamp created for INSPECT job %d at %s", c.job.ID(), details.AsOf)
260+
return nil
261+
}
262+
263+
// maybeCleanupProtectedTimestamp cleans up any protected timestamp record
264+
// associated with this job. If no protected timestamp was created, this
265+
// function does nothing.
266+
func (c *inspectResumer) maybeCleanupProtectedTimestamp(
267+
ctx context.Context, execCfg *sql.ExecutorConfig,
268+
) {
269+
details := c.job.Details().(jobspb.InspectDetails)
270+
if details.ProtectedTimestampRecord != nil {
271+
if err := execCfg.ProtectedTimestampManager.Unprotect(ctx, c.job); err != nil {
272+
log.Dev.Warningf(ctx, "failed to clean up protected timestamp: %v", err)
273+
}
274+
}
275+
}
276+
205277
func init() {
206278
createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
207279
return &inspectResumer{job: job}

pkg/sql/inspect/inspect_job_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"time"
1414

1515
"github.com/cockroachdb/cockroach/pkg/base"
16+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
1618
"github.com/cockroachdb/cockroach/pkg/sql"
19+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1720
"github.com/cockroachdb/cockroach/pkg/testutils"
1821
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1922
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
@@ -145,3 +148,159 @@ func TestInspectJobImplicitTxnSemantics(t *testing.T) {
145148
})
146149
}
147150
}
151+
152+
// TestInspectJobProtectedTimestamp verifies that INSPECT jobs properly create
153+
// and clean up protected timestamp records when using AS OF SYSTEM TIME.
154+
func TestInspectJobProtectedTimestamp(t *testing.T) {
155+
defer leaktest.AfterTest(t)()
156+
defer log.Scope(t).Close(t)
157+
158+
for _, tc := range []struct {
159+
desc string
160+
forceJobFailure bool
161+
expectedJobStatus string
162+
expectError bool
163+
}{
164+
{
165+
desc: "job success with cleanup",
166+
forceJobFailure: false,
167+
expectedJobStatus: "succeeded",
168+
expectError: false,
169+
},
170+
{
171+
desc: "job failure with cleanup",
172+
forceJobFailure: true,
173+
expectedJobStatus: "failed",
174+
expectError: true,
175+
},
176+
} {
177+
t.Run(tc.desc, func(t *testing.T) {
178+
var blockInspectExecution atomic.Bool
179+
var protectedTimestampCreated atomic.Bool
180+
181+
ctx := context.Background()
182+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
183+
Knobs: base.TestingKnobs{
184+
Inspect: &sql.InspectTestingKnobs{
185+
OnInspectAfterProtectedTimestamp: func() error {
186+
protectedTimestampCreated.Store(true)
187+
// Block execution until we've verified the protected timestamp
188+
for blockInspectExecution.Load() {
189+
time.Sleep(10 * time.Millisecond)
190+
}
191+
if tc.forceJobFailure {
192+
return errors.New("forced job failure for testing")
193+
}
194+
return nil
195+
},
196+
},
197+
},
198+
})
199+
defer s.Stopper().Stop(ctx)
200+
201+
runner := sqlutils.MakeSQLRunner(db)
202+
runner.Exec(t, `
203+
CREATE DATABASE db;
204+
SET enable_scrub_job = true;
205+
CREATE TABLE db.t (
206+
id INT PRIMARY KEY,
207+
val INT
208+
);
209+
CREATE INDEX i1 on db.t (val);
210+
INSERT INTO db.t VALUES (1, 2), (2, 3);`)
211+
212+
// Start blocking inspection execution
213+
blockInspectExecution.Store(true)
214+
215+
// Start INSPECT job with AS OF timestamp in a goroutine
216+
errCh := make(chan error, 1)
217+
go func() {
218+
_, err := db.Exec("EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1us'")
219+
errCh <- err
220+
}()
221+
222+
// Wait for the protected timestamp hook to be called
223+
testutils.SucceedsSoon(t, func() error {
224+
if !protectedTimestampCreated.Load() {
225+
return errors.New("protected timestamp hook not called yet")
226+
}
227+
return nil
228+
})
229+
230+
// Get the job ID
231+
var jobID int64
232+
runner.QueryRow(t, `
233+
SELECT id
234+
FROM crdb_internal.system_jobs
235+
WHERE job_type = 'INSPECT' AND status = 'running'
236+
ORDER BY created DESC
237+
LIMIT 1
238+
`).Scan(&jobID)
239+
240+
// Load the job and get protected timestamp record
241+
execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig)
242+
job, err := execCfg.JobRegistry.LoadJob(ctx, jobspb.JobID(jobID))
243+
require.NoError(t, err)
244+
245+
details := job.Details().(jobspb.InspectDetails)
246+
require.NotNil(t, details.ProtectedTimestampRecord, "protected timestamp record should be set")
247+
protectedTSID := *details.ProtectedTimestampRecord
248+
249+
// Check that the protected timestamp record actually exists in the system
250+
var recordExists bool
251+
require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
252+
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
253+
_, err := pts.GetRecord(ctx, protectedTSID)
254+
if err != nil {
255+
if errors.Is(err, protectedts.ErrNotExists) {
256+
recordExists = false
257+
return nil
258+
}
259+
return err
260+
}
261+
recordExists = true
262+
return nil
263+
}))
264+
require.True(t, recordExists, "protected timestamp record should exist in the system")
265+
266+
// Allow the job to complete
267+
blockInspectExecution.Store(false)
268+
269+
// Wait for job to complete
270+
select {
271+
case err := <-errCh:
272+
if tc.expectError {
273+
require.Error(t, err, "INSPECT job should fail due to forced error")
274+
} else {
275+
require.NoError(t, err, "INSPECT job should complete successfully")
276+
}
277+
case <-time.After(30 * time.Second):
278+
t.Fatal("INSPECT job did not complete within timeout")
279+
}
280+
281+
// Verify job status
282+
var status string
283+
runner.QueryRow(t, `
284+
SELECT status
285+
FROM crdb_internal.system_jobs
286+
WHERE id = $1
287+
`, jobID).Scan(&status)
288+
require.Equal(t, tc.expectedJobStatus, status, "job should have expected status")
289+
290+
// Verify protected timestamp record is cleaned up
291+
testutils.SucceedsSoon(t, func() error {
292+
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
293+
pts := execCfg.ProtectedTimestampProvider.WithTxn(txn)
294+
_, err := pts.GetRecord(ctx, protectedTSID)
295+
if err != nil {
296+
if errors.Is(err, protectedts.ErrNotExists) {
297+
return nil // This is what we want
298+
}
299+
return err
300+
}
301+
return errors.New("protected timestamp record still exists")
302+
})
303+
})
304+
})
305+
}
306+
}

0 commit comments

Comments
 (0)