@@ -7,19 +7,43 @@ package inspect
7
7
8
8
import (
9
9
"context"
10
+ "runtime"
10
11
12
+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
13
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
14
+ "github.com/cockroachdb/cockroach/pkg/settings"
11
15
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
12
16
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
13
17
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
14
18
"github.com/cockroachdb/cockroach/pkg/sql/types"
19
+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
15
20
"github.com/cockroachdb/cockroach/pkg/util/log"
16
21
"github.com/cockroachdb/cockroach/pkg/util/tracing"
22
+ "github.com/cockroachdb/errors"
17
23
)
18
24
25
+ var (
26
+ processorConcurrencyOverride = settings .RegisterIntSetting (
27
+ settings .ApplicationLevel ,
28
+ "sql.inspect.processor_concurrency" ,
29
+ "sets the number of concurrent processors for INSPECT jobs. " +
30
+ "0 uses the default based on GOMAXPROCS. Values above GOMAXPROCS are capped." ,
31
+ 0 ,
32
+ settings .NonNegativeInt ,
33
+ )
34
+ )
35
+
36
+ type inspectCheckFactory func () inspectCheck
37
+
19
38
type inspectProcessor struct {
20
- processorID int32
21
- flowCtx * execinfra.FlowCtx
22
- spec execinfrapb.InspectSpec
39
+ processorID int32
40
+ flowCtx * execinfra.FlowCtx
41
+ spec execinfrapb.InspectSpec
42
+ cfg * execinfra.ServerConfig
43
+ checkFactories []inspectCheckFactory
44
+ spanSrc spanSource
45
+ logger inspectLogger
46
+ concurrency int
23
47
}
24
48
25
49
var _ execinfra.Processor = (* inspectProcessor )(nil )
@@ -59,21 +83,153 @@ func (p *inspectProcessor) Run(ctx context.Context, output execinfra.RowReceiver
59
83
output .ProducerDone ()
60
84
}
61
85
86
+ // runInspect starts a set of worker goroutines to process spans concurrently.
87
+ // Each span is read from a buffered channel and passed to processSpan.
88
+ // The function blocks until all spans are processed or an error occurs.
62
89
func (p * inspectProcessor ) runInspect (ctx context.Context , output execinfra.RowReceiver ) error {
63
- log .Infof (ctx , "INSPECT processor started processorID=%d" , p .processorID )
90
+ log .Infof (ctx , "INSPECT processor started processorID=%d concurrency=%d" , p .processorID , p .concurrency )
91
+
92
+ group := ctxgroup .WithContext (ctx )
93
+
94
+ if p .concurrency == 0 {
95
+ return errors .AssertionFailedf ("must have at least one worker" )
96
+ }
97
+ spanCh := make (chan roachpb.Span , p .concurrency )
98
+
99
+ // Launch worker goroutines. Each worker reads spans from spanCh and processes
100
+ // them.
101
+ for i := 0 ; i < p .concurrency ; i ++ {
102
+ workerIndex := i
103
+ group .GoCtx (func (ctx context.Context ) error {
104
+ for {
105
+ select {
106
+ case <- ctx .Done ():
107
+ // If the context is canceled (e.g., due to an error in another worker), exit.
108
+ return ctx .Err ()
109
+ case span , ok := <- spanCh :
110
+ if ! ok {
111
+ // Channel is closed, no more spans to process.
112
+ return nil
113
+ }
114
+ if err := p .processSpan (ctx , span , workerIndex ); err != nil {
115
+ // On error, return it. ctxgroup will cancel all other goroutines.
116
+ return err
117
+ }
118
+ }
119
+ }
120
+ })
121
+ }
122
+
123
+ // Producer goroutine: feeds all spans into the channel for workers to consume.
124
+ group .GoCtx (func (ctx context.Context ) error {
125
+ defer close (spanCh )
126
+ for {
127
+ span , ok , err := p .spanSrc .NextSpan (ctx )
128
+ if err != nil {
129
+ return err
130
+ }
131
+ if ! ok {
132
+ return nil // done
133
+ }
134
+ select {
135
+ case <- ctx .Done ():
136
+ // Exit early if context is canceled.
137
+ return ctx .Err ()
138
+ case spanCh <- span :
139
+ // Send span to the workers.
140
+ }
141
+ }
142
+ })
143
+
144
+ // Wait for all goroutines to finish.
145
+ return group .Wait ()
146
+ }
147
+
148
+ // getProcessorConcurrency returns the number of concurrent workers to use for
149
+ // INSPECT processing. If the cluster setting is non-zero, it uses the minimum
150
+ // of the override and GOMAXPROCS. Otherwise, it defaults to GOMAXPROCS.
151
+ func getProcessorConcurrency (flowCtx * execinfra.FlowCtx ) int {
152
+ override := int (processorConcurrencyOverride .Get (& flowCtx .Cfg .Settings .SV ))
153
+ if override > 0 {
154
+ return min (runtime .GOMAXPROCS (0 ), override )
155
+ }
156
+ return runtime .GOMAXPROCS (0 )
157
+ }
158
+
159
+ // processSpan executes all configured inspect checks against a single span.
160
+ // It instantiates a fresh set of checks from the configured factories and uses
161
+ // an inspectRunner to drive their execution.
162
+ func (p * inspectProcessor ) processSpan (
163
+ ctx context.Context , span roachpb.Span , workerIndex int ,
164
+ ) error {
165
+ checks := make ([]inspectCheck , len (p .checkFactories ))
166
+ for i , factory := range p .checkFactories {
167
+ checks [i ] = factory ()
168
+ }
169
+ runner := inspectRunner {
170
+ checks : checks ,
171
+ logger : p .logger ,
172
+ }
173
+
174
+ // Process all checks on the given span.
175
+ for {
176
+ ok , err := runner .Step (ctx , p .cfg , span , workerIndex )
177
+ if err != nil {
178
+ return err
179
+ }
180
+ if ! ok {
181
+ break
182
+ }
183
+ }
64
184
return nil
65
185
}
66
186
187
+ // newInspectProcessor constructs a new inspectProcessor from the given InspectSpec.
188
+ // It parses the spec to generate a set of inspectCheck factories, sets up the span source,
189
+ // and wires in logging and concurrency controls.
190
+ //
191
+ // The returned processor is ready for integration into a distributed flow, but will not
192
+ // begin execution until Run is called.
67
193
func newInspectProcessor (
68
194
ctx context.Context , flowCtx * execinfra.FlowCtx , processorID int32 , spec execinfrapb.InspectSpec ,
69
195
) (execinfra.Processor , error ) {
196
+ checkFactories , err := buildInspectCheckFactories (spec )
197
+ if err != nil {
198
+ return nil , err
199
+ }
70
200
return & inspectProcessor {
71
- spec : spec ,
72
- processorID : processorID ,
73
- flowCtx : flowCtx ,
201
+ spec : spec ,
202
+ processorID : processorID ,
203
+ flowCtx : flowCtx ,
204
+ checkFactories : checkFactories ,
205
+ cfg : flowCtx .Cfg ,
206
+ spanSrc : newSliceSpanSource (spec .Spans ),
207
+ // TODO(148301): log to cockroach.log for now, but later log to system.inspect_errors
208
+ logger : & logSink {},
209
+ concurrency : getProcessorConcurrency (flowCtx ),
74
210
}, nil
75
211
}
76
212
213
+ // buildInspectCheckFactories returns a slice of factory functions that create
214
+ // inspectCheck instances at runtime. Each factory corresponds to one check entry
215
+ // defined in the job's InspectSpec.
216
+ //
217
+ // This indirection ensures that each check instance is freshly created per span,
218
+ // avoiding shared state across concurrent workers.
219
+ func buildInspectCheckFactories (spec execinfrapb.InspectSpec ) ([]inspectCheckFactory , error ) {
220
+ checkFactories := make ([]inspectCheckFactory , 0 , len (spec .InspectDetails .Checks ))
221
+ for _ , specCheck := range spec .InspectDetails .Checks {
222
+ switch specCheck .Type {
223
+ case jobspb .InspectCheckIndexConsistency :
224
+ // TODO(148863): implement the index consistency checker. No-op for now.
225
+
226
+ default :
227
+ return nil , errors .AssertionFailedf ("unsupported inspect check type: %v" , specCheck .Type )
228
+ }
229
+ }
230
+ return checkFactories , nil
231
+ }
232
+
77
233
func init () {
78
234
rowexec .NewInspectProcessor = newInspectProcessor
79
235
}
0 commit comments