Skip to content

Commit 6447ea5

Browse files
committed
sql: use ExecutorConfig in inspect logger
The inspect resumer needs to be able to log issues. The logger can use the `ExecutorConfig` embedded in the context without changing its behavior. Part of: #155472 Epic: CRDB-55075 Release note: None
1 parent cc95bdd commit 6447ea5

File tree

2 files changed

+19
-20
lines changed

2 files changed

+19
-20
lines changed

pkg/sql/inspect/index_consistency_check.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/roachpb"
1717
"github.com/cockroachdb/cockroach/pkg/security/username"
1818
"github.com/cockroachdb/cockroach/pkg/settings"
19+
"github.com/cockroachdb/cockroach/pkg/sql"
1920
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2021
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
@@ -79,7 +80,7 @@ const (
7980
type indexConsistencyCheck struct {
8081
indexConsistencyCheckApplicability
8182

82-
flowCtx *execinfra.FlowCtx
83+
execCfg *sql.ExecutorConfig
8384
indexID descpb.IndexID
8485
// tableVersion is the descriptor version recorded when the check was planned.
8586
// It is used to detect concurrent schema changes for non-AS OF inspections.
@@ -255,7 +256,7 @@ func (c *indexConsistencyCheck) Start(
255256
}
256257
}
257258

258-
if indexConsistencyHashEnabled.Get(&c.flowCtx.Cfg.Settings.SV) && len(allColNames) > 0 {
259+
if indexConsistencyHashEnabled.Get(&c.execCfg.Settings.SV) && len(allColNames) > 0 {
259260
// The hash precheck uses crdb_internal.datums_to_bytes, which depends on
260261
// keyside.Encode. Skip if any column type isn’t encodable (i.e. TSQUERY, etc.).
261262
if !allColumnsDatumsToBytesCompatible(c.columns) {
@@ -298,8 +299,8 @@ func (c *indexConsistencyCheck) Start(
298299
c.lastQueryPlaceholders = queryArgs
299300

300301
// Execute the query with AS OF SYSTEM TIME embedded in the SQL
301-
qos := getInspectQoS(&c.flowCtx.Cfg.Settings.SV)
302-
it, err := c.flowCtx.Cfg.DB.Executor().QueryIteratorEx(
302+
qos := getInspectQoS(&c.execCfg.Settings.SV)
303+
it, err := c.execCfg.DistSQLSrv.DB.Executor().QueryIteratorEx(
303304
ctx, "inspect-index-consistency-check", nil, /* txn */
304305
sessiondata.InternalExecutorOverride{
305306
User: username.NodeUserName(),
@@ -457,7 +458,7 @@ func (c *indexConsistencyCheck) Rows() uint64 {
457458
// eligible for consistency checking. If the index is valid, it stores the
458459
// descriptor and index metadata in the indexConsistencyCheck struct.
459460
func (c *indexConsistencyCheck) loadCatalogInfo(ctx context.Context) error {
460-
return c.flowCtx.Cfg.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
461+
return c.execCfg.DistSQLSrv.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
461462
if !c.asOf.IsEmpty() {
462463
if err := txn.KV().SetFixedTimestamp(ctx, c.asOf); err != nil {
463464
return err
@@ -813,8 +814,8 @@ func (c *indexConsistencyCheck) computeHashAndRowCount(
813814
query := buildIndexHashQuery(c.tableDesc.GetID(), index, columnNames, predicate)
814815
queryWithAsOf := fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", query, c.asOf.AsOfSystemTime())
815816

816-
qos := getInspectQoS(&c.flowCtx.Cfg.Settings.SV)
817-
row, err := c.flowCtx.Cfg.DB.Executor().QueryRowEx(
817+
qos := getInspectQoS(&c.execCfg.Settings.SV)
818+
row, err := c.execCfg.DistSQLSrv.DB.Executor().QueryRowEx(
818819
ctx, "inspect-index-consistency-hash", nil, /* txn */
819820
sessiondata.InternalExecutorOverride{
820821
User: username.NodeUserName(),

pkg/sql/inspect/inspect_processor.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -234,16 +234,15 @@ func getProcessorConcurrency(flowCtx *execinfra.FlowCtx) int {
234234
}
235235

236236
// getInspectLogger returns a logger bundle for the inspect processor.
237-
func getInspectLogger(flowCtx *execinfra.FlowCtx, jobID jobspb.JobID) *inspectLoggerBundle {
238-
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
237+
func getInspectLogger(execCfg *sql.ExecutorConfig, jobID jobspb.JobID) *inspectLoggerBundle {
239238
metrics := execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics)
240239

241240
loggers := []inspectLogger{
242241
&logSink{},
243242
&tableSink{
244-
db: flowCtx.Cfg.DB,
243+
db: execCfg.DistSQLSrv.DB,
245244
jobID: jobID,
246-
sv: &flowCtx.Cfg.Settings.SV,
245+
sv: &execCfg.Settings.SV,
247246
},
248247
&metricsLogger{
249248
issuesFoundCtr: metrics.IssuesFound,
@@ -415,15 +414,14 @@ func (p *inspectProcessor) pushProgressMeta(
415414
func newInspectProcessor(
416415
ctx context.Context, flowCtx *execinfra.FlowCtx, processorID int32, spec execinfrapb.InspectSpec,
417416
) (execinfra.Processor, error) {
418-
checkFactories, err := buildInspectCheckFactories(flowCtx, spec)
417+
execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
418+
419+
checkFactories, err := buildInspectCheckFactories(execCfg, spec)
419420
if err != nil {
420421
return nil, err
421422
}
422-
var spansProcessedCtr *metric.Counter
423-
if execCfg, ok := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig); ok {
424-
inspectMetrics := execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics)
425-
spansProcessedCtr = inspectMetrics.SpansProcessed
426-
}
423+
inspectMetrics := execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics)
424+
spansProcessedCtr := inspectMetrics.SpansProcessed
427425

428426
return &inspectProcessor{
429427
spec: spec,
@@ -432,7 +430,7 @@ func newInspectProcessor(
432430
checkFactories: checkFactories,
433431
cfg: flowCtx.Cfg,
434432
spanSrc: newSliceSpanSource(spec.Spans),
435-
loggerBundle: getInspectLogger(flowCtx, spec.JobID),
433+
loggerBundle: getInspectLogger(execCfg, spec.JobID),
436434
concurrency: getProcessorConcurrency(flowCtx),
437435
clock: flowCtx.Cfg.DB.KV().Clock(),
438436
spansProcessedCtr: spansProcessedCtr,
@@ -446,7 +444,7 @@ func newInspectProcessor(
446444
// This indirection ensures that each check instance is freshly created per span,
447445
// avoiding shared state across concurrent workers.
448446
func buildInspectCheckFactories(
449-
flowCtx *execinfra.FlowCtx, spec execinfrapb.InspectSpec,
447+
execCfg *sql.ExecutorConfig, spec execinfrapb.InspectSpec,
450448
) ([]inspectCheckFactory, error) {
451449
checkFactories := make([]inspectCheckFactory, 0, len(spec.InspectDetails.Checks))
452450
for _, specCheck := range spec.InspectDetails.Checks {
@@ -460,7 +458,7 @@ func buildInspectCheckFactories(
460458
indexConsistencyCheckApplicability: indexConsistencyCheckApplicability{
461459
tableID: tableID,
462460
},
463-
flowCtx: flowCtx,
461+
execCfg: execCfg,
464462
indexID: indexID,
465463
tableVersion: tableVersion,
466464
asOf: asOf,

0 commit comments

Comments
 (0)