@@ -100,6 +100,12 @@ type inspectProcessor struct {
100100 // worker goroutines.
101101 syncutil.Mutex
102102 }
103+
104+ // test, if set, supports test-only behavior.
105+ test *struct {
106+ // progress is the job progress used to persist state after a span is processed.
107+ progress *jobspb.InspectProgress
108+ }
103109}
104110
105111var _ execinfra.Processor = (*inspectProcessor)(nil)
@@ -202,6 +208,11 @@ func (p *inspectProcessor) runInspect(ctx context.Context, output execinfra.RowR
202208 return err
203209 }
204210
211+ // Run any post-checks after all spans have been processed.
212+ if err := p.processPostChecks(ctx); err != nil {
213+ return err
214+ }
215+
205216 // Send final completion message to indicate this processor is finished
206217 if err := p.sendInspectProgress(ctx, output, 0, true /* finished */); err != nil {
207218 return err
@@ -268,7 +279,7 @@ func (p *inspectProcessor) processSpan(
268279 asOfToUse := p.getTimestampForSpan()
269280
270281 // Only create checks that apply to this span.
271- var checks []inspectCheck
282+ var checks inspectChecks
272283 for _, factory := range p.checkFactories {
273284 check := factory(asOfToUse)
274285 applies, err := check.AppliesTo(p.cfg.Codec, span)
@@ -279,6 +290,16 @@ func (p *inspectProcessor) processSpan(
279290 checks = append(checks, check)
280291 }
281292 }
293+ checks.sortStable()
294+
295+ // Provide the meta-checks with a reference to all the other checks.
296+ for _, check := range checks {
297+ if metaCheck, ok := check.(inspectMetaCheck); ok {
298+ if err := metaCheck.RegisterChecks(checks); err != nil {
299+ return err
300+ }
301+ }
302+ }
282303
283304 runner := inspectRunner{
284305 checks: checks,
@@ -316,14 +337,74 @@ func (p *inspectProcessor) processSpan(
316337 }
317338 }
318339
340+ progressMsg := &jobspb.InspectProcessorProgress{
341+ ChecksCompleted: 0, // No additional checks completed, just marking span done
342+ Finished: false,
343+ }
344+
345+ for _, check := range checks {
346+ if check, ok := check.(inspectMetaCheck); ok {
347+ err := check.MetaCheck(ctx, progressMsg, p.loggerBundle)
348+ if err != nil {
349+ return err
350+ }
351+ }
352+ }
353+
319354 // Report span completion for checkpointing.
320- if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */); err != nil {
355+ if err := p.sendSpanCompletionProgress(ctx, output, span, false /* finished */, progressMsg); err != nil {
356+ return err
357+ }
358+
359+ return nil
360+ }
361+
362+ // processPostChecks runs any post-checks using the accumulated job progress.
363+ func (p *inspectProcessor) processPostChecks(ctx context.Context) (err error) {
364+ progress, err := p.loadProgress(ctx, p.spec.JobID)
365+ if err != nil {
321366 return err
322367 }
323368
369+ for _, factory := range p.checkFactories {
370+ check := factory(p.getTimestampForSpan())
371+ if postCheck, ok := check.(inspectPostCheck); ok {
372+ issues, err := postCheck.Issues(ctx, progress)
373+ if err != nil {
374+ return err
375+ }
376+ for _, issue := range issues {
377+ err = p.loggerBundle.logIssue(ctx, issue)
378+ if err != nil {
379+ return errors.Wrapf(err, "error logging inspect issue")
380+ }
381+ }
382+ }
383+ }
384+
324385 return nil
325386}
326387
388+ func (p *inspectProcessor) loadProgress(
389+ ctx context.Context, jobID jobspb.JobID,
390+ ) (*jobspb.InspectProgress, error) {
391+ if p.test != nil {
392+ return p.test.progress, nil
393+ }
394+
395+ job, err := p.cfg.JobRegistry.LoadJob(ctx, p.spec.JobID)
396+ if err != nil {
397+ return nil, err
398+ }
399+
400+ progress, ok := job.Progress().Details.(*jobspb.Progress_Inspect)
401+ if !ok {
402+ return nil, errors.New("job is not an inspect job")
403+ }
404+
405+ return progress.Inspect, nil
406+ }
407+
327408// sendInspectProgress marshals and sends inspect processor progress via BulkProcessorProgress.
328409func (p *inspectProcessor) sendInspectProgress(
329410 ctx context.Context, output execinfra.RowReceiver, checksCompleted int64, finished bool,
@@ -365,15 +446,18 @@ func (p *inspectProcessor) getTimestampForSpan() hlc.Timestamp {
365446// sendSpanCompletionProgress sends progress indicating a span has been completed.
366447// This is used for checkpointing to track which spans are done.
367448func (p *inspectProcessor) sendSpanCompletionProgress(
368- ctx context.Context, output execinfra.RowReceiver, completedSpan roachpb.Span, finished bool,
449+ ctx context.Context,
450+ output execinfra.RowReceiver,
451+ completedSpan roachpb.Span,
452+ finished bool,
453+ progressMsg *jobspb.InspectProcessorProgress,
369454) error {
370455 if p.spansProcessedCtr != nil {
371456 p.spansProcessedCtr.Inc(1)
372457 }
373- progressMsg := &jobspb.InspectProcessorProgress{
374- ChecksCompleted: 0, // No additional checks completed, just marking span done
375- Finished: finished,
376- }
458+
459+ progressMsg.ChecksCompleted = 0 // No additional checks completed, just marking span done
460+ progressMsg.Finished = finished
377461
378462 progressAny, err := pbtypes.MarshalAny(progressMsg)
379463 if err != nil {
@@ -466,7 +550,6 @@ func buildInspectCheckFactories(
466550 asOf: asOf,
467551 }
468552 })
469-
470553 default:
471554 return nil, errors.AssertionFailedf("unsupported inspect check type: %v", specCheck.Type)
472555 }
0 commit comments