Skip to content

Commit a62155f

Browse files
committed
sql/inspect: introduce new INSPECT processor into DistSQL flow
This change integrates a new INSPECT processor into the DistSQL framework. The job coordinator for INSPECT now builds a distributed plan across the key spans of a single table (currently only supports one table). The plan is executed across nodes based on the partitioning of the table, invoking the new INSPECT processor on each participating node. At present, the processor is a no-op that simply returns success, laying groundwork for future enhancements. Informs: #148683 Epic: CRDB-30356 Release note: none
1 parent ac3b2b7 commit a62155f

File tree

23 files changed

+503
-77
lines changed

23 files changed

+503
-77
lines changed

.github/CODEOWNERS

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
/pkg/sql/execinfrapb/processors_bulk_io.proto @cockroachdb/disaster-recovery
6363
/pkg/sql/execinfrapb/processors_changefeeds.proto @cockroachdb/cdc-prs
6464
/pkg/sql/execinfrapb/processors_ttl.proto @cockroachdb/sql-foundations
65+
/pkg/sql/execinfrapb/processors_inspect.proto @cockroachdb/sql-foundations
6566
/pkg/sql/exec_factory_util.go @cockroachdb/sql-queries-prs
6667
#!/pkg/sql/exec_log*.go @cockroachdb/sql-queries-noreview
6768
#!/pkg/sql/exec_util*.go @cockroachdb/sql-queries-noreview
@@ -106,6 +107,7 @@
106107
/pkg/sql/tests/rsg_test.go @cockroachdb/sql-foundations
107108
/pkg/sql/ttl @cockroachdb/sql-foundations
108109
/pkg/sql/spanutils/ @cockroachdb/sql-foundations
110+
/pkg/sql/inspect/ @cockroachdb/sql-foundations
109111

110112
/pkg/sql/syntheticprivilege/ @cockroachdb/sql-foundations
111113
/pkg/sql/syntheticprivilegecache/ @cockroachdb/sql-foundations

pkg/jobs/jobspb/jobs.proto

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,7 +1405,42 @@ message HotRangesLoggerDetails {
14051405
}
14061406

14071407
message InspectDetails {
1408+
// Check represents a single validation task to perform as part of an INSPECT
1409+
// job. Each check targets a specific table or index and is associated with a
1410+
// validation type.
1411+
message Check {
1412+
enum InspectCheckType {
1413+
option (gogoproto.goproto_enum_prefix) = false;
1414+
option (gogoproto.goproto_enum_stringer) = false;
1415+
1416+
// UNSPECIFIED indicates an unset check type. This is invalid.
1417+
INSPECT_CHECK_UNSPECIFIED = 0 [(gogoproto.enumvalue_customname) = "InspectCheckUnspecified"];
1418+
1419+
// INDEX_CONSISTENCY performs validation between primary and secondary indexes
1420+
// to detect missing or dangling index entries.
1421+
INSPECT_CHECK_INDEX_CONSISTENCY = 1 [(gogoproto.enumvalue_customname) = "InspectCheckIndexConsistency"];
1422+
}
1423+
1424+
// Type is the kind of validation to perform.
1425+
InspectCheckType type = 1;
1426+
1427+
// TableID identifies the table this check applies to.
1428+
uint32 table_id = 2 [
1429+
(gogoproto.customname) = "TableID",
1430+
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
1431+
];
1432+
1433+
// IndexID identifies the specific index to check, if applicable. Only
1434+
// relevant for check types that operate at the index level. Should be unset
1435+
// or ignored for table-level checks.
1436+
uint32 index_id = 3 [
1437+
(gogoproto.customname) = "IndexID",
1438+
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID"
1439+
];
1440+
}
14081441

1442+
// Checks is the list of individual checks this job will perform.
1443+
repeated Check checks = 1;
14091444
}
14101445

14111446
message UpdateTableMetadataCacheDetails {}

pkg/sql/colexec/colbuilder/execplan.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, core *execinfrapb.ProcessorCo
320320
return errIndexBackfillMergerWrap
321321
case core.Ttl != nil:
322322
return errCoreNotWorthWrapping
323+
case core.Inspect != nil:
324+
return errCoreNotWorthWrapping
323325
case core.HashGroupJoiner != nil:
324326
case core.GenerativeSplitAndScatter != nil:
325327
return errCoreNotWorthWrapping

pkg/sql/distsql_leaf_txn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ func planSafeForReducedLeaf(processors []physicalplan.Processor) bool {
233233
return unoptimizedProcessor
234234
case core.Ttl != nil:
235235
return unoptimizedProcessor
236+
case core.Inspect != nil:
237+
return unoptimizedProcessor
236238
case core.HashGroupJoiner != nil:
237239
if unsafeExpr(core.HashGroupJoiner.HashJoinerSpec.OnExpr) {
238240
return unsafeProcessor

pkg/sql/execinfrapb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ proto_library(
8282
"processors_base.proto",
8383
"processors_bulk_io.proto",
8484
"processors_changefeeds.proto",
85+
"processors_inspect.proto",
8586
"processors_sql.proto",
8687
"processors_table_stats.proto",
8788
"processors_ttl.proto",

pkg/sql/execinfrapb/flow_diagram.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,13 @@ func (s *TTLSpec) summary() (string, []string) {
739739
}
740740
}
741741

742+
// summary implements the diagramCellType interface.
743+
func (s *InspectSpec) summary() (string, []string) {
744+
return "INSPECT", []string{
745+
fmt.Sprintf("JobID: %d", s.JobID),
746+
}
747+
}
748+
742749
// summary implements the diagramCellType interface.
743750
func (s *HashGroupJoinerSpec) summary() (string, []string) {
744751
_, details := s.HashJoinerSpec.summary()

pkg/sql/execinfrapb/processors.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import "sql/execinfrapb/data.proto";
1818
import "sql/execinfrapb/processors_base.proto";
1919
import "sql/execinfrapb/processors_sql.proto";
2020
import "sql/execinfrapb/processors_ttl.proto";
21+
import "sql/execinfrapb/processors_inspect.proto";
2122
import "sql/execinfrapb/processors_bulk_io.proto";
2223
import "sql/execinfrapb/processors_changefeeds.proto";
2324
import "sql/execinfrapb/processors_table_stats.proto";
@@ -126,9 +127,10 @@ message ProcessorCoreUnion {
126127
optional VectorSearchSpec vectorSearch = 47;
127128
optional VectorMutationSearchSpec vectorMutationSearch = 48;
128129
optional CompactBackupsSpec compactBackups = 49;
130+
optional InspectSpec inspect = 50;
129131

130132
reserved 6, 12, 14, 17, 18, 19, 20, 32;
131-
// NEXT ID: 50.
133+
// NEXT ID: 51.
132134
}
133135

134136
// NoopCoreSpec indicates a "no-op" processor core. This is used when we just
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
//
6+
// Processor definitions for distributed SQL APIs. See
7+
// docs/RFCS/distributed_sql.md.
8+
// All the concepts here are "physical plan" concepts.
9+
10+
syntax = "proto2";
11+
// Beware! This package name must not be changed, even though it doesn't match
12+
// the Go package name, because it defines the Protobuf message names which
13+
// can't be changed without breaking backward compatibility.
14+
package cockroach.sql.distsqlrun;
15+
option go_package = "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb";
16+
17+
import "gogoproto/gogo.proto";
18+
import "roachpb/data.proto";
19+
import "jobs/jobspb/jobs.proto";
20+
21+
message InspectSpec {
22+
// JobID of the job that ran the inspect processor.
23+
optional int64 job_id = 1 [
24+
(gogoproto.nullable) = false,
25+
(gogoproto.customname) = "JobID",
26+
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb.JobID"
27+
];
28+
29+
// InspectDetails are the details of the job that ran the inspect processor.
30+
optional jobs.jobspb.InspectDetails inspect_details = 2 [
31+
(gogoproto.nullable) = false,
32+
(gogoproto.customname) = "InspectDetails"
33+
];
34+
35+
// Spans determine which records are processed by which nodes in the DistSQL
36+
// flow.
37+
repeated roachpb.Span spans = 3 [(gogoproto.nullable) = false];
38+
}

pkg/sql/inspect/BUILD.bazel

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,29 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "inspect",
5-
srcs = ["inspect_job.go"],
5+
srcs = [
6+
"inspect_job.go",
7+
"inspect_processor.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/sql/inspect",
710
visibility = ["//visibility:public"],
811
deps = [
912
"//pkg/jobs",
1013
"//pkg/jobs/jobspb",
14+
"//pkg/roachpb",
1115
"//pkg/settings/cluster",
1216
"//pkg/sql",
17+
"//pkg/sql/catalog/descs",
18+
"//pkg/sql/execinfra",
19+
"//pkg/sql/execinfrapb",
1320
"//pkg/sql/isql",
21+
"//pkg/sql/physicalplan",
22+
"//pkg/sql/rowexec",
23+
"//pkg/sql/sem/tree",
24+
"//pkg/sql/types",
1425
"//pkg/util/log",
26+
"//pkg/util/tracing",
27+
"@com_github_cockroachdb_errors//:errors",
1528
],
1629
)
1730

@@ -27,6 +40,7 @@ go_test(
2740
"//pkg/security/securitytest",
2841
"//pkg/server",
2942
"//pkg/sql",
43+
"//pkg/testutils",
3044
"//pkg/testutils/serverutils",
3145
"//pkg/testutils/sqlutils",
3246
"//pkg/testutils/testcluster",

pkg/sql/inspect/inspect_job.go

Lines changed: 149 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,17 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/jobs"
1212
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
1314
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1415
"github.com/cockroachdb/cockroach/pkg/sql"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
17+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1518
"github.com/cockroachdb/cockroach/pkg/sql/isql"
19+
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
20+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
21+
"github.com/cockroachdb/cockroach/pkg/sql/types"
1622
"github.com/cockroachdb/cockroach/pkg/util/log"
23+
"github.com/cockroachdb/errors"
1724
)
1825

1926
type inspectResumer struct {
@@ -34,25 +41,26 @@ func (c *inspectResumer) Resume(ctx context.Context, execCtx interface{}) error
3441
knobs = *inspectKnobs
3542
}
3643

37-
if knobs.OnInspectJobStart != nil {
38-
if err := knobs.OnInspectJobStart(); err != nil {
39-
return err
40-
}
44+
if err := maybeRunOnJobStartHook(knobs); err != nil {
45+
return err
4146
}
4247

43-
if err := c.job.NoTxn().Update(ctx,
44-
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
45-
progress := md.Progress
46-
progress.Progress = &jobspb.Progress_FractionCompleted{
47-
FractionCompleted: 1,
48-
}
49-
ju.UpdateProgress(progress)
50-
return nil
51-
},
52-
); err != nil {
48+
pkSpans, err := c.getPrimaryIndexSpans(ctx, execCfg)
49+
if err != nil {
5350
return err
5451
}
55-
return nil
52+
53+
// TODO(149460): add a goroutine that will replan the job on topology changes
54+
plan, planCtx, err := c.planInspectProcessors(ctx, jobExecCtx, pkSpans)
55+
if err != nil {
56+
return err
57+
}
58+
59+
if err := c.runInspectPlan(ctx, jobExecCtx, planCtx, plan); err != nil {
60+
return err
61+
}
62+
63+
return c.markJobComplete(ctx)
5664
}
5765

5866
// OnFailOrCancel implements the Resumer interface
@@ -67,6 +75,132 @@ func (c *inspectResumer) CollectProfile(ctx context.Context, execCtx interface{}
6775
return nil
6876
}
6977

78+
func maybeRunOnJobStartHook(knobs sql.InspectTestingKnobs) error {
79+
if knobs.OnInspectJobStart != nil {
80+
return knobs.OnInspectJobStart()
81+
}
82+
return nil
83+
}
84+
85+
// getPrimaryIndexSpans returns the primary index spans for all tables involved in
86+
// the INSPECT job's checks.
87+
func (c *inspectResumer) getPrimaryIndexSpans(
88+
ctx context.Context, execCfg *sql.ExecutorConfig,
89+
) ([]roachpb.Span, error) {
90+
details := c.job.Details().(jobspb.InspectDetails)
91+
92+
spans := make([]roachpb.Span, 0, len(details.Checks))
93+
err := execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
94+
for i := range details.Checks {
95+
desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, details.Checks[i].TableID)
96+
if err != nil {
97+
return err
98+
}
99+
spans = append(spans, desc.PrimaryIndexSpan(execCfg.Codec))
100+
}
101+
return nil
102+
})
103+
return spans, err
104+
}
105+
106+
// planInspectProcessors constructs the physical plan for the INSPECT job by
107+
// partitioning the given primary index spans across all nodes in the cluster.
108+
// Each processor will be assigned one or more spans to run their checks on.
109+
func (c *inspectResumer) planInspectProcessors(
110+
ctx context.Context, jobExecCtx sql.JobExecContext, entirePKSpans []roachpb.Span,
111+
) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
112+
if len(entirePKSpans) > 1 {
113+
return nil, nil, errors.AssertionFailedf("we only support one check: %d", len(entirePKSpans))
114+
}
115+
distSQLPlanner := jobExecCtx.DistSQLPlanner()
116+
planCtx, _, err := distSQLPlanner.SetupAllNodesPlanning(ctx, jobExecCtx.ExtendedEvalContext(), jobExecCtx.ExecCfg())
117+
if err != nil {
118+
return nil, nil, err
119+
}
120+
121+
spanPartitions, err := distSQLPlanner.PartitionSpans(ctx, planCtx, entirePKSpans, sql.PartitionSpansBoundDefault)
122+
if err != nil {
123+
return nil, nil, err
124+
}
125+
126+
jobID := c.job.ID()
127+
newProcessorSpec := func(spans []roachpb.Span) *execinfrapb.InspectSpec {
128+
return &execinfrapb.InspectSpec{
129+
JobID: jobID,
130+
Spans: spans,
131+
}
132+
}
133+
134+
// Set up a one-stage plan with one proc per input spec.
135+
processorCorePlacements := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions))
136+
for i, spanPartition := range spanPartitions {
137+
processorCorePlacements[i].SQLInstanceID = spanPartition.SQLInstanceID
138+
processorCorePlacements[i].Core.Inspect = newProcessorSpec(spanPartition.Spans)
139+
}
140+
141+
physicalPlan := planCtx.NewPhysicalPlan()
142+
physicalPlan.AddNoInputStage(
143+
processorCorePlacements,
144+
execinfrapb.PostProcessSpec{},
145+
[]*types.T{},
146+
execinfrapb.Ordering{},
147+
nil, /* finalizeLastStageCb */
148+
)
149+
physicalPlan.PlanToStreamColMap = []int{}
150+
151+
sql.FinalizePlan(ctx, planCtx, physicalPlan)
152+
return physicalPlan, planCtx, nil
153+
}
154+
155+
// runInspectPlan executes the distributed physical plan for the INSPECT job.
156+
// It sets up a metadata-only DistSQL receiver to collect any execution errors,
157+
// then runs the plan using the provided planning context and evaluation context.
158+
// This function returns any error surfaced via metadata from the processors.
159+
func (c *inspectResumer) runInspectPlan(
160+
ctx context.Context,
161+
jobExecCtx sql.JobExecContext,
162+
planCtx *sql.PlanningCtx,
163+
plan *sql.PhysicalPlan,
164+
) error {
165+
execCfg := jobExecCtx.ExecCfg()
166+
167+
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter()
168+
169+
distSQLReceiver := sql.MakeDistSQLReceiver(
170+
ctx,
171+
metadataCallbackWriter,
172+
tree.Rows,
173+
execCfg.RangeDescriptorCache,
174+
nil, /* txn */
175+
nil, /* clockUpdater */
176+
jobExecCtx.ExtendedEvalContext().Tracing,
177+
)
178+
defer distSQLReceiver.Release()
179+
180+
distSQLPlanner := jobExecCtx.DistSQLPlanner()
181+
182+
// Copy the eval.Context, as dsp.Run() might change it.
183+
evalCtxCopy := jobExecCtx.ExtendedEvalContext().Context.Copy()
184+
185+
distSQLPlanner.Run(ctx, planCtx, nil /* txn */, plan,
186+
distSQLReceiver, evalCtxCopy, nil /* finishedSetupFn */)
187+
return metadataCallbackWriter.Err()
188+
}
189+
190+
func (c *inspectResumer) markJobComplete(ctx context.Context) error {
191+
// TODO(148297): add fine-grained progress reporting
192+
return c.job.NoTxn().Update(ctx,
193+
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
194+
progress := md.Progress
195+
progress.Progress = &jobspb.Progress_FractionCompleted{
196+
FractionCompleted: 1,
197+
}
198+
ju.UpdateProgress(progress)
199+
return nil
200+
},
201+
)
202+
}
203+
70204
func init() {
71205
createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
72206
return &inspectResumer{job: job}

0 commit comments

Comments
 (0)