@@ -15,13 +15,17 @@ import (
1515
1616 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
1717 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
18+ "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
1819 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
1920 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2021 "github.com/cockroachdb/cockroach/pkg/roachprod/logger"
22+ "github.com/cockroachdb/cockroach/pkg/sql/lexbase"
23+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
2124 "github.com/cockroachdb/cockroach/pkg/testutils"
2225 "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2326 "github.com/cockroachdb/cockroach/pkg/util/timeutil"
2427 "github.com/cockroachdb/errors"
28+ "github.com/lib/pq"
2529 "github.com/stretchr/testify/require"
2630)
2731
@@ -126,6 +130,284 @@ func CheckInvalidDescriptors(ctx context.Context, db *gosql.DB) error {
126130 return nil
127131}
128132
133+ // CheckInspectDatabase runs INSPECT DATABASE in parallel on user databases
134+ // to verify consistency. System databases (system, postgres, defaultdb) are
135+ // excluded. All INSPECT jobs run concurrently under a time budget. Once the
136+ // budget is exceeded, we check if any errors were found. And return success if
137+ // none.
138+ //
139+ // If the cluster version does not support INSPECT (requires v25.4+), the
140+ // check is skipped and returns nil without error.
141+ func CheckInspectDatabase (
142+ ctx context.Context , l * logger.Logger , db * gosql.DB , timeout time.Duration ,
143+ ) error {
144+ databases , err := discoverUserDatabases (ctx , db )
145+ if err != nil {
146+ return err
147+ }
148+ if len (databases ) == 0 {
149+ l .Printf ("No user databases found, skipping INSPECT check" )
150+ return nil
151+ }
152+
153+ l .Printf ("Found %d user databases to INSPECT: %v" , len (databases ), databases )
154+ l .Printf ("INSPECT timeout budget: %s total" , timeout )
155+
156+ if err := launchInspectJobs (ctx , l , db , databases ); err != nil {
157+ // If INSPECT is not supported due to cluster version, skip the check.
158+ if isFeatureNotSupportedError (err ) {
159+ l .Printf ("INSPECT not supported on this cluster version (requires v25.4+), skipping validation" )
160+ return nil
161+ }
162+ return err
163+ }
164+
165+ jobIDs , err := fetchInspectJobIDs (ctx , db , len (databases ))
166+ if err != nil {
167+ return err
168+ }
169+ l .Printf ("Found %d INSPECT jobs: %v" , len (jobIDs ), jobIDs )
170+
171+ waitForInspectJobCompletion (ctx , l , db , jobIDs , timeout )
172+
173+ totalErrors , errorDetails , err := collectInspectErrors (ctx , db , jobIDs )
174+ if err != nil {
175+ return err
176+ }
177+
178+ if totalErrors > 0 {
179+ return errors .Errorf ("INSPECT found %d consistency errors:%s" , totalErrors , errorDetails )
180+ }
181+
182+ l .Printf ("INSPECT validation completed successfully (0 errors across %d jobs)" , len (jobIDs ))
183+ return nil
184+ }
185+
186+ // discoverUserDatabases queries pg_database to find all user databases,
187+ // excluding the system database.
188+ func discoverUserDatabases (ctx context.Context , db * gosql.DB ) ([]string , error ) {
189+ rows , err := db .QueryContext (ctx , `
190+ SELECT datname FROM pg_database
191+ WHERE datname NOT IN ('system')
192+ ORDER BY datname` )
193+ if err != nil {
194+ return nil , errors .Wrap (err , "failed to query databases" )
195+ }
196+ defer rows .Close ()
197+
198+ var databases []string
199+ for rows .Next () {
200+ var dbName string
201+ if err := rows .Scan (& dbName ); err != nil {
202+ return nil , err
203+ }
204+ databases = append (databases , dbName )
205+ }
206+ return databases , rows .Err ()
207+ }
208+
209+ // isStatementTimeoutError returns true if the error is a statement timeout error.
210+ // Statement timeout errors are expected when launching INSPECT jobs since we only
211+ // want to start the job, not wait for it to complete.
212+ func isStatementTimeoutError (err error ) bool {
213+ var pqErr * pq.Error
214+ if errors .As (err , & pqErr ) {
215+ return pgcode .MakeCode (string (pqErr .Code )) == pgcode .QueryCanceled
216+ }
217+ return false
218+ }
219+
220+ // isFeatureNotSupportedError returns true if the error is a feature not supported error.
221+ // This can occur when the cluster version is not yet upgraded to support INSPECT.
222+ func isFeatureNotSupportedError (err error ) bool {
223+ var pqErr * pq.Error
224+ if errors .As (err , & pqErr ) {
225+ return pgcode .MakeCode (string (pqErr .Code )) == pgcode .FeatureNotSupported
226+ }
227+ return false
228+ }
229+
230+ // launchInspectJobs launches INSPECT DATABASE commands in parallel for all
231+ // provided databases using task manager for concurrency control. Each INSPECT
232+ // command enables the inspect command for that connection. Statement timeout
233+ // errors are ignored as they indicate the job was successfully started.
234+ // Feature not supported errors are returned to the caller, indicating the
235+ // cluster version does not support INSPECT.
236+ func launchInspectJobs (
237+ ctx context.Context , l * logger.Logger , db * gosql.DB , databases []string ,
238+ ) error {
239+ // Make the INSPECT jobs go as fast as possible.
240+ if _ , err := db .ExecContext (ctx ,
241+ "SET CLUSTER SETTING sql.inspect.admission_control.enabled = off" ); err != nil {
242+ return errors .Wrap (err , "failed to disable INSPECT admission control" )
243+ }
244+
245+ statementTimeout := 5 * time .Second
246+ tm := task .NewManager (ctx , l )
247+ g := tm .NewErrorGroup ()
248+
249+ for _ , dbName := range databases {
250+ dbName := dbName
251+ g .Go (func (ctx context.Context , l * logger.Logger ) error {
252+ l .Printf ("Launching INSPECT DATABASE %s" , dbName )
253+
254+ statements := []string {
255+ "SET enable_inspect_command = true" ,
256+ fmt .Sprintf ("SET statement_timeout = '%s'" , statementTimeout .String ()),
257+ fmt .Sprintf ("INSPECT DATABASE %s" , lexbase .EscapeSQLIdent (dbName )),
258+ }
259+
260+ var stmtErr error
261+ for _ , stmt := range statements {
262+ if _ , err := db .ExecContext (ctx , stmt ); err != nil {
263+ stmtErr = err
264+ break
265+ }
266+ }
267+
268+ // Always reset statement timeout back to default.
269+ if _ , err := db .ExecContext (ctx , "RESET statement_timeout" ); err != nil {
270+ l .Printf ("Warning: failed to reset statement timeout: %v" , err )
271+ }
272+
273+ // Check for errors from the statements loop.
274+ if stmtErr != nil {
275+ // Statement timeout is expected - it means the job started but didn't complete
276+ // within the timeout. The job is still running in the background.
277+ if ! isStatementTimeoutError (stmtErr ) {
278+ l .Printf ("INSPECT DATABASE %s failed to start: %v" , dbName , stmtErr )
279+ return errors .Wrapf (stmtErr , "failed to start INSPECT DATABASE %s" , dbName )
280+ }
281+ }
282+
283+ l .Printf ("INSPECT DATABASE %s started" , dbName )
284+ return nil
285+ })
286+ }
287+ return g .WaitE ()
288+ }
289+
290+ // fetchInspectJobIDs queries SHOW JOBS to retrieve the job IDs of the most
291+ // recently started INSPECT jobs.
292+ func fetchInspectJobIDs (ctx context.Context , db * gosql.DB , expectedCount int ) ([]int64 , error ) {
293+ rows , err := db .QueryContext (ctx , fmt .Sprintf (`
294+ SELECT job_id
295+ FROM [SHOW JOBS]
296+ WHERE job_type = 'INSPECT'
297+ ORDER BY started DESC
298+ LIMIT %d` , expectedCount ))
299+ if err != nil {
300+ return nil , errors .Wrap (err , "failed to query INSPECT job IDs" )
301+ }
302+ defer rows .Close ()
303+
304+ var jobIDs []int64
305+ for rows .Next () {
306+ var jobID int64
307+ if err := rows .Scan (& jobID ); err != nil {
308+ return nil , err
309+ }
310+ jobIDs = append (jobIDs , jobID )
311+ }
312+ if err := rows .Err (); err != nil {
313+ return nil , err
314+ }
315+
316+ if len (jobIDs ) == 0 {
317+ return nil , errors .New ("no INSPECT jobs were found after launching" )
318+ }
319+ return jobIDs , nil
320+ }
321+
322+ // waitForInspectJobCompletion polls SHOW JOBS until all INSPECT jobs complete
323+ // or the timeout is reached. Logs progress periodically.
324+ func waitForInspectJobCompletion (
325+ ctx context.Context , l * logger.Logger , db * gosql.DB , jobIDs []int64 , timeout time.Duration ,
326+ ) {
327+ deadline := timeutil .Now ().Add (timeout )
328+ pollInterval := 5 * time .Second
329+ logEvery := Every (30 * time .Second )
330+
331+ for timeutil .Now ().Before (deadline ) {
332+ time .Sleep (pollInterval )
333+
334+ placeholders := make ([]string , len (jobIDs ))
335+ args := make ([]interface {}, len (jobIDs ))
336+ for i , jobID := range jobIDs {
337+ placeholders [i ] = fmt .Sprintf ("$%d" , i + 1 )
338+ args [i ] = jobID
339+ }
340+
341+ var pendingCount int
342+ countQuery := fmt .Sprintf (`
343+ SELECT count(*)
344+ FROM [SHOW JOBS]
345+ WHERE job_id IN (%s)
346+ AND status NOT IN ('succeeded', 'failed', 'canceled')` ,
347+ strings .Join (placeholders , ", " ))
348+
349+ if err := db .QueryRowContext (ctx , countQuery , args ... ).Scan (& pendingCount ); err != nil {
350+ l .Printf ("Warning: failed to check job status: %v" , err )
351+ return
352+ }
353+
354+ if pendingCount == 0 {
355+ l .Printf ("All INSPECT jobs completed" )
356+ return
357+ }
358+
359+ if logEvery .ShouldLog () {
360+ l .Printf ("Waiting for %d INSPECT jobs to complete (%s remaining)" ,
361+ pendingCount , deadline .Sub (timeutil .Now ()).Round (time .Second ))
362+ }
363+ }
364+
365+ l .Printf ("INSPECT timeout reached, checking for errors anyway" )
366+ }
367+
368+ // collectInspectErrors queries SHOW INSPECT ERRORS for each job and returns
369+ // the total error count and formatted error details.
370+ func collectInspectErrors (ctx context.Context , db * gosql.DB , jobIDs []int64 ) (int , string , error ) {
371+ totalErrors := 0
372+ var errorDetails strings.Builder
373+
374+ for _ , jobID := range jobIDs {
375+ var errorCount int
376+ if err := db .QueryRowContext (ctx ,
377+ fmt .Sprintf ("SELECT count(*) FROM [SHOW INSPECT ERRORS FOR JOB %d]" , jobID ),
378+ ).Scan (& errorCount ); err != nil {
379+ return 0 , "" , errors .Wrapf (err , "failed to query INSPECT errors for job %d" , jobID )
380+ }
381+
382+ if errorCount > 0 {
383+ totalErrors += errorCount
384+ if err := func () error {
385+ rows , err := db .QueryContext (ctx , fmt .Sprintf (`
386+ SELECT database_name, schema_name, table_name, error_type, details
387+ FROM [SHOW INSPECT ERRORS FOR JOB %d WITH DETAILS]` , jobID ))
388+ if err != nil {
389+ return errors .Wrapf (err , "failed to fetch error details for job %d" , jobID )
390+ }
391+ defer rows .Close ()
392+
393+ fmt .Fprintf (& errorDetails , "\n Job %d:" , jobID )
394+ for rows .Next () {
395+ var dbName , schemaName , tableName , errorType , details string
396+ if err := rows .Scan (& dbName , & schemaName , & tableName , & errorType , & details ); err != nil {
397+ return err
398+ }
399+ fmt .Fprintf (& errorDetails , "\n - %s.%s.%s: %s (%s)" ,
400+ dbName , schemaName , tableName , errorType , details )
401+ }
402+ return rows .Err ()
403+ }(); err != nil {
404+ return 0 , "" , err
405+ }
406+ }
407+ }
408+ return totalErrors , errorDetails .String (), nil
409+ }
410+
129411// validateTokensReturned ensures that all RACv2 tokens are returned to the pool
130412// at the end of the test.
131413func ValidateTokensReturned (
0 commit comments