@@ -163,6 +163,94 @@ func setupSchemaChangePromCounter(reg prometheus.Registerer) schemaChangeCounter
163163 }
164164}
165165
166+ func (s * schemaChange ) logInspectErrors (
167+ ctx context.Context , pool * workload.MultiConnPool , log * atomicLog ,
168+ ) error {
169+ connPool := pool .Get ()
170+ conn , err := connPool .Acquire (ctx )
171+ if err != nil {
172+ log .printLn (fmt .Sprintf ("unable to acquire connection for SHOW INSPECT ERRORS: %v" , err ))
173+ return err
174+ }
175+ defer conn .Release ()
176+
177+ rows , err := conn .Query (ctx , `SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY created DESC` )
178+ if err != nil {
179+ log .printLn (fmt .Sprintf ("fetching INSPECT jobs failed: %v" , err ))
180+ return err
181+ }
182+ jobIDs , err := pgx .CollectRows (rows , pgx .RowTo [int64 ])
183+ rows .Close ()
184+ if err != nil {
185+ log .printLn (fmt .Sprintf ("collecting INSPECT job IDs failed: %v" , err ))
186+ return err
187+ }
188+ if len (jobIDs ) == 0 {
189+ return nil
190+ }
191+
192+ type InspectJobResult struct {
193+ JobID int64 `json:"jobId"`
194+ Status string `json:"status"`
195+ Errors []map [string ]any `json:"errors,omitempty"`
196+ }
197+
198+ type InspectErrorSummary struct {
199+ Message string `json:"message"`
200+ Jobs []InspectJobResult `json:"jobs"`
201+ }
202+
203+ summary := InspectErrorSummary {
204+ Message : "Inspect Job Errors" ,
205+ Jobs : make ([]InspectJobResult , 0 , len (jobIDs )),
206+ }
207+
208+ var totalErrors int
209+ for _ , jobID := range jobIDs {
210+ query := fmt .Sprintf (`
211+ SELECT error_type, database_name, schema_name, table_name, primary_key, job_id, aost, details
212+ FROM [SHOW INSPECT ERRORS FOR JOB %d WITH DETAILS]` , jobID )
213+ rows , err := conn .Query (ctx , query )
214+ if err != nil {
215+ log .printLn (fmt .Sprintf ("%s failed: %v" , query , err ))
216+ continue
217+ }
218+ results , err := pgx .CollectRows (rows , pgx .RowToMap )
219+ rows .Close ()
220+ if err != nil {
221+ log .printLn (fmt .Sprintf ("collecting inspect errors for job %d failed: %v" , jobID , err ))
222+ continue
223+ }
224+
225+ jobResult := InspectJobResult {
226+ JobID : jobID ,
227+ }
228+
229+ if len (results ) == 0 {
230+ jobResult .Status = "no errors reported"
231+ } else {
232+ jobResult .Status = fmt .Sprintf ("%d error rows" , len (results ))
233+ jobResult .Errors = results
234+ totalErrors += len (results )
235+ }
236+
237+ summary .Jobs = append (summary .Jobs , jobResult )
238+ }
239+
240+ // Output as JSON.
241+ jsonBytes , err := json .MarshalIndent (summary , "" , " " )
242+ if err != nil {
243+ log .printLn (fmt .Sprintf ("failed to marshal inspect errors to JSON: %v" , err ))
244+ return err
245+ }
246+ log .printLn (string (jsonBytes ))
247+
248+ if totalErrors > 0 {
249+ return errors .Newf ("found %d inspect errors across %d jobs" , totalErrors , len (jobIDs ))
250+ }
251+ return nil
252+ }
253+
166254// Meta implements the workload.Generator interface.
167255func (s * schemaChange ) Meta () workload.Meta { return schemaChangeMeta }
168256
@@ -246,20 +334,22 @@ func (s *schemaChange) Ops(
246334
247335 ql := workload.QueryLoad {
248336 Close : func (_ context.Context ) error {
249- // Create a new context for shutting down the tracer provider. The
250- // provided context may be cancelled depending on why the workload is
251- // shutting down and we always want to provide a period of time to flush
252- // traces.
253- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
337+ // Create a new context for shutting down the tracer provider and logging
338+ // inspect errors. The provided context may be cancelled depending on why
339+ // the workload is shutting down and we always want to provide a period of
340+ // time to flush traces.
341+ ctx , cancel := context .WithTimeout (context .Background (), 15 * time .Second )
254342 defer cancel ()
255343
344+ inspectErr := s .logInspectErrors (ctx , pool , stdoutLog )
345+
256346 pool .Close ()
257347 watchDogPool .Close ()
258348
259349 closeErr := s .closeJSONLogFile ()
260350 shutdownErr := tracerProvider .Shutdown (ctx )
261351 s .schemaWorkloadResultAnnotator .logWorkloadStats (stdoutLog )
262- return errors .CombineErrors ( closeErr , shutdownErr )
352+ return errors .Join ( inspectErr , closeErr , shutdownErr )
263353 },
264354 }
265355
@@ -564,6 +654,9 @@ func (w *schemaChangeWorker) run(ctx context.Context) error {
564654 return err
565655 }
566656 }
657+ if _ , err := conn .Exec (ctx , "SET enable_inspect_command = true;" ); err != nil {
658+ return err
659+ }
567660
568661 tx , err := conn .Begin (ctx )
569662 if err != nil {
0 commit comments