@@ -483,7 +483,7 @@ func batchJobInsertStmt(
483
483
return "" , nil , nil , errors .NewAssertionErrorWithWrappedErrf (err , "failed to make timestamp for creation of job" )
484
484
}
485
485
instanceID := r .ID ()
486
- columns := []string {`id` , `created` , `status` , `claim_session_id` , `claim_instance_id` , `job_type` }
486
+ columns := []string {`id` , `created` , `status` , `claim_session_id` , `claim_instance_id` , `job_type` , `owner` , `description` }
487
487
valueFns := map [string ]func (* Job ) (interface {}, error ){
488
488
`id` : func (job * Job ) (interface {}, error ) { return job .ID (), nil },
489
489
`created` : func (job * Job ) (interface {}, error ) { return created , nil },
@@ -494,12 +494,8 @@ func batchJobInsertStmt(
494
494
payload := job .Payload ()
495
495
return payload .Type ().String (), nil
496
496
},
497
- }
498
-
499
- if schemaVersion .AtLeast (clusterversion .TODO_Delete_V25_1_AddJobsColumns .Version ()) {
500
- columns = append (columns , `owner` , `description` )
501
- valueFns [`owner` ] = func (job * Job ) (interface {}, error ) { return job .Payload ().UsernameProto .Decode ().Normalized (), nil }
502
- valueFns [`description` ] = func (job * Job ) (interface {}, error ) { return job .Payload ().Description , nil }
497
+ `owner` : func (job * Job ) (interface {}, error ) { return job .Payload ().UsernameProto .Decode ().Normalized (), nil },
498
+ `description` : func (job * Job ) (interface {}, error ) { return job .Payload ().Description , nil },
503
499
}
504
500
505
501
appendValues := func (job * Job , vals * []interface {}) (err error ) {
@@ -590,15 +586,9 @@ func (r *Registry) CreateJobWithTxn(
590
586
return errors .NewAssertionErrorWithWrappedErrf (err , "failed to construct job created timestamp" )
591
587
}
592
588
593
- cols := []string {"id" , "created" , "status" , "claim_session_id" , "claim_instance_id" , "job_type" }
594
- vals := []interface {}{jobID , created , StateRunning , s .ID ().UnsafeBytes (), r .ID (), jobType .String ()}
595
- v , err := txn .GetSystemSchemaVersion (ctx )
596
- if err != nil {
597
- return err
598
- }
599
- if v .AtLeast (clusterversion .TODO_Delete_V25_1_AddJobsColumns .Version ()) {
600
- cols = append (cols , "owner" , "description" )
601
- vals = append (vals , j .mu .payload .UsernameProto .Decode ().Normalized (), j .mu .payload .Description )
589
+ cols := []string {"id" , "created" , "status" , "claim_session_id" , "claim_instance_id" , "job_type" , "owner" , "description" }
590
+ vals := []interface {}{
591
+ jobID , created , StateRunning , s .ID ().UnsafeBytes (), r .ID (), jobType .String (), j .mu .payload .UsernameProto .Decode ().Normalized (), j .mu .payload .Description ,
602
592
}
603
593
604
594
totalNumCols := len (cols )
@@ -730,18 +720,9 @@ func (r *Registry) CreateAdoptableJobWithTxn(
730
720
}
731
721
typ := j .mu .payload .Type ().String ()
732
722
733
- cols := []string {"id" , "created" , "status" , "created_by_type" , "created_by_id" , "job_type" }
734
- placeholders := []string {"$1" , "now() at time zone 'utc'" , "$2" , "$3" , "$4" , "$5" }
735
- vals := []interface {}{jobID , StateRunning , createdByType , createdByID , typ }
736
- v , err := txn .GetSystemSchemaVersion (ctx )
737
- if err != nil {
738
- return err
739
- }
740
- if v .AtLeast (clusterversion .TODO_Delete_V25_1_AddJobsColumns .Version ()) {
741
- cols = append (cols , "owner" , "description" )
742
- placeholders = append (placeholders , "$6" , "$7" )
743
- vals = append (vals , j .mu .payload .UsernameProto .Decode ().Normalized (), j .mu .payload .Description )
744
- }
723
+ cols := []string {"id" , "created" , "status" , "created_by_type" , "created_by_id" , "job_type" , "owner" , "description" }
724
+ placeholders := []string {"$1" , "now() at time zone 'utc'" , "$2" , "$3" , "$4" , "$5" , "$6" , "$7" }
725
+ vals := []interface {}{jobID , StateRunning , createdByType , createdByID , typ , j .mu .payload .UsernameProto .Decode ().Normalized (), j .mu .payload .Description }
745
726
746
727
// Insert the job row, but do not set a `claim_session_id`. By not
747
728
// setting the claim, the job can be adopted by any node and will
@@ -1275,20 +1256,11 @@ func (r *Registry) cleanupOldJobsPage(
1275
1256
}
1276
1257
1277
1258
counts := make (map [string ]int )
1278
- for i , tbl := range jobMetadataTables {
1259
+ for _ , tbl := range jobMetadataTables {
1279
1260
var deleted int
1280
1261
if err := r .db .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
1281
1262
// Tables other than job_info -- the 0th -- are only present if the txn is
1282
1263
// running at a version that includes them.
1283
- if i > 0 {
1284
- v , err := txn .GetSystemSchemaVersion (ctx )
1285
- if err != nil {
1286
- return err
1287
- }
1288
- if v .Less (clusterversion .TODO_Delete_V25_1_AddJobsTables .Version ()) {
1289
- return nil
1290
- }
1291
- }
1292
1264
deleted , err = txn .Exec (ctx , redact .RedactableString ("gc-job-" + tbl ), txn .KV (),
1293
1265
"DELETE FROM system." + tbl + " WHERE job_id = ANY($1)" , toDelete ,
1294
1266
)
@@ -1337,17 +1309,7 @@ func (r *Registry) DeleteTerminalJobByID(ctx context.Context, id jobspb.JobID) e
1337
1309
if err != nil {
1338
1310
return err
1339
1311
}
1340
- for i , tbl := range jobMetadataTables {
1341
- if i > 0 {
1342
- v , err := txn .GetSystemSchemaVersion (ctx )
1343
- if err != nil {
1344
- return err
1345
- }
1346
- if v .Less (clusterversion .TODO_Delete_V25_1_AddJobsTables .Version ()) {
1347
- break
1348
- }
1349
- }
1350
-
1312
+ for _ , tbl := range jobMetadataTables {
1351
1313
_ , err = txn .Exec (
1352
1314
ctx , redact .RedactableString ("delete-job-" + tbl ), txn .KV (),
1353
1315
"DELETE FROM system." + tbl + " WHERE job_id = $1" , id ,
0 commit comments