Skip to content

Commit 7dd49da

Browse files
craig[bot]bghal
andcommitted
Merge #152304
152304: sql: report inspect errors r=bghal a=bghal sql: report INSPECT errors to table The inspect job logs errors. This change persists the errors to the `system.inspect_errors` table where they will be viewed with the `SHOW INSPECT ERRORS` command. Fixes: #148301 Epic: CRDB-30356 Release note: None Co-authored-by: Brendan Gerrity <[email protected]>
2 parents e5b76a4 + 903c1af commit 7dd49da

File tree

7 files changed

+227
-19
lines changed

7 files changed

+227
-19
lines changed

pkg/sql/inspect/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ go_library(
55
srcs = [
66
"index_consistency_check.go",
77
"inspect_job.go",
8+
"inspect_logger.go",
89
"inspect_processor.go",
910
"issue.go",
1011
"log_sink.go",
1112
"runner.go",
1213
"span_source.go",
14+
"table_sink.go",
1315
],
1416
importpath = "github.com/cockroachdb/cockroach/pkg/sql/inspect",
1517
visibility = ["//visibility:public"],
@@ -58,6 +60,7 @@ go_test(
5860
"issue_test.go",
5961
"main_test.go",
6062
"runner_test.go",
63+
"table_sink_test.go",
6164
],
6265
embed = [":inspect"],
6366
exec_properties = select({
@@ -66,6 +69,7 @@ go_test(
6669
}),
6770
deps = [
6871
"//pkg/base",
72+
"//pkg/jobs/jobspb",
6973
"//pkg/keys",
7074
"//pkg/kv",
7175
"//pkg/roachpb",
@@ -75,6 +79,7 @@ go_test(
7579
"//pkg/settings/cluster",
7680
"//pkg/sql",
7781
"//pkg/sql/catalog",
82+
"//pkg/sql/catalog/descs",
7883
"//pkg/sql/catalog/desctestutils",
7984
"//pkg/sql/execinfra",
8085
"//pkg/sql/execinfrapb",

pkg/sql/inspect/inspect_logger.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package inspect
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/errors"
12+
)
13+
14+
// inspectLogger records issues found by inspect checks. Implementations of this
15+
// interface define how inspectIssue results are handled.
16+
type inspectLogger interface {
17+
// logIssue records an inspectIssue found by a check.
18+
logIssue(ctx context.Context, issue *inspectIssue) error
19+
20+
// hasIssues returns true if any issues have been logged.
21+
hasIssues() bool
22+
}
23+
24+
// inspectLoggers manages a collection of inspectLogger instances.
25+
type inspectLoggers []inspectLogger
26+
27+
var _ inspectLogger = inspectLoggers{}
28+
29+
func (l inspectLoggers) logIssue(ctx context.Context, issue *inspectIssue) error {
30+
var retErr error
31+
32+
for _, logger := range l {
33+
if err := logger.logIssue(ctx, issue); err != nil {
34+
retErr = errors.CombineErrors(retErr, err)
35+
}
36+
}
37+
return retErr
38+
}
39+
40+
func (l inspectLoggers) hasIssues() bool {
41+
for _, logger := range l {
42+
if logger.hasIssues() {
43+
return true
44+
}
45+
}
46+
return false
47+
}

pkg/sql/inspect/inspect_processor.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,21 @@ func getProcessorConcurrency(flowCtx *execinfra.FlowCtx) int {
167167
}
168168

169169
// getInspectLogger returns a logger for the inspect processor.
170-
func getInspectLogger(flowCtx *execinfra.FlowCtx) inspectLogger {
170+
func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) inspectLogger {
171+
loggers := inspectLoggers{
172+
&logSink{},
173+
&tableSink{
174+
db: flowCtx.Cfg.DB,
175+
jobID: jobID,
176+
},
177+
}
178+
171179
knobs := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig).InspectTestingKnobs
172-
if knobs == nil || knobs.InspectIssueLogger == nil {
173-
// TODO(148301): Implement a proper logger that writes to system.inspect_errors.
174-
return &logSink{}
180+
if knobs != nil && knobs.InspectIssueLogger != nil {
181+
loggers = append(loggers, knobs.InspectIssueLogger.(inspectLogger))
175182
}
176-
return knobs.InspectIssueLogger.(inspectLogger)
183+
184+
return loggers
177185
}
178186

179187
// processSpan executes all configured inspect checks against a single span.
@@ -225,14 +233,15 @@ func newInspectProcessor(
225233
if err != nil {
226234
return nil, err
227235
}
236+
228237
return &inspectProcessor{
229238
spec: spec,
230239
processorID: processorID,
231240
flowCtx: flowCtx,
232241
checkFactories: checkFactories,
233242
cfg: flowCtx.Cfg,
234243
spanSrc: newSliceSpanSource(spec.Spans),
235-
logger: getInspectLogger(flowCtx),
244+
logger: getInspectLogger(flowCtx, spec.JobID),
236245
concurrency: getProcessorConcurrency(flowCtx),
237246
}, nil
238247
}

pkg/sql/inspect/log_sink.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,26 @@ package inspect
77

88
import (
99
"context"
10+
"sync/atomic"
1011

1112
"github.com/cockroachdb/cockroach/pkg/util/log"
1213
)
1314

1415
// logSink will report any inspect errors directly to cockroach.log.
1516
type logSink struct {
16-
foundIssue bool
17+
foundIssue atomic.Bool
1718
}
1819

1920
var _ inspectLogger = &logSink{}
2021

2122
// logIssue implements the inspectLogger interface.
2223
func (c *logSink) logIssue(ctx context.Context, issue *inspectIssue) error {
23-
c.foundIssue = true
24+
c.foundIssue.Store(true)
2425
log.Dev.Errorf(ctx, "inspect issue: %+v", issue)
2526
return nil
2627
}
2728

2829
// hasIssues implements the inspectLogger interface.
2930
func (c *logSink) hasIssues() bool {
30-
return c.foundIssue
31+
return c.foundIssue.Load()
3132
}

pkg/sql/inspect/runner.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,6 @@ type inspectCheck interface {
4242
Close(ctx context.Context) error
4343
}
4444

45-
// inspectLogger records issues found by inspect checks. Implementations of this
46-
// interface define how inspectIssue results are handled.
47-
type inspectLogger interface {
48-
// logIssue records an inspectIssue found by a check.
49-
logIssue(ctx context.Context, issue *inspectIssue) error
50-
51-
// hasIssues returns true if any issues have been logged.
52-
hasIssues() bool
53-
}
54-
5545
// inspectRunner coordinates the execution of a set of inspectChecks.
5646
//
5747
// It manages the lifecycle of each check, including initialization,

pkg/sql/inspect/table_sink.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package inspect
7+
8+
import (
9+
"context"
10+
gojson "encoding/json"
11+
"sync/atomic"
12+
13+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
14+
"github.com/cockroachdb/cockroach/pkg/security/username"
15+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
16+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
17+
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
18+
)
19+
20+
// tableSink will report any inspect errors directly to system.inspect_errors.
21+
type tableSink struct {
22+
foundIssue atomic.Bool
23+
db descs.DB
24+
jobID jobspb.JobID
25+
}
26+
27+
var _ inspectLogger = &tableSink{}
28+
29+
// logIssue implements the inspectLogger interface.
30+
func (c *tableSink) logIssue(ctx context.Context, issue *inspectIssue) error {
31+
c.foundIssue.Store(true)
32+
33+
detailsBytes, err := gojson.Marshal(issue.Details)
34+
if err != nil {
35+
return err
36+
}
37+
38+
executor := c.db.Executor()
39+
40+
if _, err = executor.ExecEx(
41+
ctx,
42+
"insert-inspect-error",
43+
nil, /* txn */
44+
sessiondata.InternalExecutorOverride{
45+
User: username.NodeUserName(),
46+
QualityOfService: &sessiondatapb.BulkLowQoS,
47+
},
48+
`INSERT INTO system.inspect_errors
49+
(job_id, error_type, aost, database_id, schema_id, id, primary_key, details)
50+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
51+
c.jobID,
52+
string(issue.ErrorType),
53+
issue.AOST,
54+
issue.DatabaseID,
55+
issue.SchemaID,
56+
issue.ObjectID,
57+
issue.PrimaryKey,
58+
string(detailsBytes),
59+
); err != nil {
60+
return err
61+
}
62+
63+
return nil
64+
}
65+
66+
// hasIssues implements the inspectLogger interface.
67+
func (c *tableSink) hasIssues() bool {
68+
return c.foundIssue.Load()
69+
}

pkg/sql/inspect/table_sink_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package inspect
7+
8+
import (
9+
"context"
10+
gojson "encoding/json"
11+
"testing"
12+
"time"
13+
14+
"github.com/cockroachdb/cockroach/pkg/base"
15+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
17+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
18+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
19+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
20+
"github.com/cockroachdb/cockroach/pkg/util/log"
21+
"github.com/cockroachdb/redact"
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
func TestTableSink(t *testing.T) {
26+
defer leaktest.AfterTest(t)()
27+
defer log.Scope(t).Close(t)
28+
29+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
30+
defer s.Stopper().Stop(context.Background())
31+
runner := sqlutils.MakeSQLRunner(db)
32+
33+
const jobID jobspb.JobID = 50
34+
35+
var tableLogger inspectLogger = &tableSink{
36+
db: s.InternalDB().(descs.DB),
37+
jobID: jobID,
38+
}
39+
40+
issue := inspectIssue{
41+
ErrorType: MissingSecondaryIndexEntry,
42+
DatabaseID: 1,
43+
SchemaID: 2,
44+
ObjectID: 3,
45+
PrimaryKey: "key",
46+
Details: map[redact.RedactableString]interface{}{
47+
"foo": `"bar"`,
48+
"biz": "\u2603baz",
49+
"titi": "toto\n",
50+
},
51+
}
52+
53+
require.NoError(t, tableLogger.logIssue(context.Background(), &issue))
54+
55+
// Query the system.inspect_errors table and expect one entry
56+
var count int
57+
runner.QueryRow(t, `SELECT count(*) FROM system.inspect_errors WHERE job_id = $1`, jobID).Scan(&count)
58+
require.Equal(t, 1, count, "Expected exactly one entry in system.inspect_errors")
59+
60+
// Compare the entry against the test instance
61+
var actualJobID int64
62+
var actualErrorType, actualPrimaryKey string
63+
var actualAOST time.Time
64+
var actualDatabaseID, actualSchemaID, actualObjectID int64
65+
var actualDetailsBytes []byte
66+
67+
runner.QueryRow(t, `SELECT job_id, error_type, aost, database_id, schema_id, id, primary_key, details
68+
FROM system.inspect_errors WHERE job_id = $1`, jobID).Scan(
69+
&actualJobID, &actualErrorType, &actualAOST, &actualDatabaseID, &actualSchemaID,
70+
&actualObjectID, &actualPrimaryKey, &actualDetailsBytes)
71+
72+
require.Equal(t, int64(jobID), actualJobID, "job_id should match")
73+
require.Equal(t, string(issue.ErrorType), actualErrorType, "error_type should match")
74+
require.Equal(t, issue.AOST, actualAOST, "aost should match")
75+
require.Equal(t, int64(issue.DatabaseID), actualDatabaseID, "database_id should match")
76+
require.Equal(t, int64(issue.SchemaID), actualSchemaID, "schema_id should match")
77+
require.Equal(t, int64(issue.ObjectID), actualObjectID, "id should match")
78+
require.Equal(t, issue.PrimaryKey, actualPrimaryKey, "primary_key should match")
79+
80+
var detailsMap map[string]interface{}
81+
err := gojson.Unmarshal(actualDetailsBytes, &detailsMap)
82+
require.NoError(t, err)
83+
require.Len(t, detailsMap, 3)
84+
require.Equal(t, `"bar"`, detailsMap["foo"], "details[foo] should match")
85+
require.Equal(t, "\u2603baz", detailsMap["biz"], "details[biz] should match")
86+
require.Equal(t, "toto\n", detailsMap["titi"], "details[titi] should match")
87+
}

0 commit comments

Comments
 (0)