Skip to content

Commit 2469c6a

Browse files
committed
jobs: add invalid job ID checks to storage types
This patch aims to prevent a class of bugs resulting from jobs unintentionally passing zero job IDs when accessing the job storage tables, which could have unintended consequences. One way this could conceivably happen is as a result of not initializing a field or var that stores a job ID. Release note: None
1 parent d8e21ad commit 2469c6a

File tree

2 files changed

+265
-0
lines changed

2 files changed

+265
-0
lines changed

pkg/jobs/job_info_storage.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func (j *Job) ProgressStorage() ProgressStorage {
4747
func (i ProgressStorage) Get(
4848
ctx context.Context, txn isql.Txn,
4949
) (float64, hlc.Timestamp, time.Time, error) {
50+
if jobspb.JobID(i) == jobspb.InvalidJobID {
51+
return 0, hlc.Timestamp{}, time.Time{}, errors.AssertionFailedf("invalid job ID")
52+
}
53+
5054
ctx, sp := tracing.ChildSpan(ctx, "get-job-progress")
5155
defer sp.Finish()
5256

@@ -96,6 +100,10 @@ func (i ProgressStorage) Get(
96100
func (i ProgressStorage) Set(
97101
ctx context.Context, txn isql.Txn, fraction float64, resolved hlc.Timestamp,
98102
) error {
103+
if jobspb.JobID(i) == jobspb.InvalidJobID {
104+
return errors.AssertionFailedf("invalid job ID")
105+
}
106+
99107
ctx, sp := tracing.ChildSpan(ctx, "write-job-progress")
100108
defer sp.Finish()
101109

@@ -166,6 +174,10 @@ func (j *Job) StatusStorage() StatusStorage {
166174

167175
// Clear clears the status message row for the job, if it exists.
168176
func (i StatusStorage) Clear(ctx context.Context, txn isql.Txn) error {
177+
if jobspb.JobID(i) == jobspb.InvalidJobID {
178+
return errors.AssertionFailedf("invalid job ID")
179+
}
180+
169181
_, err := txn.ExecEx(
170182
ctx, "clear-job-status-delete", txn.KV(), sessiondata.NodeUserSessionDataOverride,
171183
`DELETE FROM system.job_status WHERE job_id = $1`, i,
@@ -176,6 +188,10 @@ func (i StatusStorage) Clear(ctx context.Context, txn isql.Txn) error {
176188
// Sets writes the current status, replacing the current one if it exists.
177189
// Setting an empty status is the same as calling Clear().
178190
func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) error {
191+
if jobspb.JobID(i) == jobspb.InvalidJobID {
192+
return errors.AssertionFailedf("invalid job ID")
193+
}
194+
179195
ctx, sp := tracing.ChildSpan(ctx, "write-job-status")
180196
defer sp.Finish()
181197

@@ -197,6 +213,10 @@ func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) err
197213

198214
// Get gets the current status mesasge for a job, if any.
199215
func (i StatusStorage) Get(ctx context.Context, txn isql.Txn) (string, time.Time, error) {
216+
if jobspb.JobID(i) == jobspb.InvalidJobID {
217+
return "", time.Time{}, errors.AssertionFailedf("invalid job ID")
218+
}
219+
200220
ctx, sp := tracing.ChildSpan(ctx, "get-job-status")
201221
defer sp.Finish()
202222

@@ -247,6 +267,10 @@ func (j *Job) Messages() MessageStorage {
247267
// log for this job, and prunes retained messages of the same kind based on the
248268
// configured limit to keep the total number of retained messages bounded.
249269
func (i MessageStorage) Record(ctx context.Context, txn isql.Txn, kind, message string) error {
270+
if jobspb.JobID(i) == jobspb.InvalidJobID {
271+
return errors.AssertionFailedf("invalid job ID")
272+
}
273+
250274
ctx, sp := tracing.ChildSpan(ctx, "write-job-message")
251275
defer sp.Finish()
252276

@@ -279,6 +303,10 @@ type JobMessage struct {
279303
}
280304

281305
func (i MessageStorage) Fetch(ctx context.Context, txn isql.Txn) (_ []JobMessage, retErr error) {
306+
if jobspb.JobID(i) == jobspb.InvalidJobID {
307+
return nil, errors.AssertionFailedf("invalid job ID")
308+
}
309+
282310
ctx, sp := tracing.ChildSpan(ctx, "get-all-job-message")
283311
defer sp.Finish()
284312

@@ -374,6 +402,9 @@ func (i *InfoStorage) checkClaimSession(ctx context.Context) error {
374402
}
375403

376404
func (i InfoStorage) get(ctx context.Context, opName, infoKey string) ([]byte, bool, error) {
405+
if i.j.ID() == jobspb.InvalidJobID {
406+
return nil, false, errors.AssertionFailedf("invalid job ID")
407+
}
377408
if i.txn == nil {
378409
return nil, false, errors.New("cannot access the job info table without an associated txn")
379410
}
@@ -440,6 +471,9 @@ func (i InfoStorage) write(
440471
func (i InfoStorage) doWrite(
441472
ctx context.Context, fn func(ctx context.Context, job *Job, txn isql.Txn) error,
442473
) error {
474+
if i.j.ID() == jobspb.InvalidJobID {
475+
return errors.AssertionFailedf("invalid job ID")
476+
}
443477
if i.txn == nil {
444478
return errors.New("cannot write to the job info table without an associated txn")
445479
}
@@ -466,6 +500,9 @@ func (i InfoStorage) iterate(
466500
infoPrefix string,
467501
fn func(infoKey string, value []byte) error,
468502
) (retErr error) {
503+
if i.j.ID() == jobspb.InvalidJobID {
504+
return errors.AssertionFailedf("invalid job ID")
505+
}
469506
if i.txn == nil {
470507
return errors.New("cannot iterate over the job info table without an associated txn")
471508
}
@@ -589,6 +626,9 @@ func (i InfoStorage) DeleteRange(
589626

590627
// Count counts the info records in the range [start, end).
591628
func (i InfoStorage) Count(ctx context.Context, startInfoKey, endInfoKey string) (int, error) {
629+
if i.j.ID() == jobspb.InvalidJobID {
630+
return 0, errors.AssertionFailedf("invalid job ID")
631+
}
592632
if i.txn == nil {
593633
return 0, errors.New("cannot access the job info table without an associated txn")
594634
}

pkg/jobs/job_info_storage_test.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,3 +622,228 @@ func TestJobProgressAndStatusAccessors(t *testing.T) {
622622
sql.CheckQueryResults(t, fmt.Sprintf("SELECT status from system.job_status where job_id = %d", job4.ID()), [][]string{{"c"}})
623623
})
624624
}
625+
626+
func TestStorageRejectsInvalidJobID(t *testing.T) {
627+
defer leaktest.AfterTest(t)()
628+
defer log.Scope(t).Close(t)
629+
630+
ctx := context.Background()
631+
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
632+
defer s.Stopper().Stop(ctx)
633+
634+
db := s.InternalDB().(isql.DB)
635+
636+
t.Run("ProgressStorage", func(t *testing.T) {
637+
progressStorage := jobs.ProgressStorage(jobspb.InvalidJobID)
638+
639+
t.Run("Get", func(t *testing.T) {
640+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
641+
_, _, _, err := progressStorage.Get(ctx, txn)
642+
require.Error(t, err)
643+
require.Contains(t, err.Error(), "invalid job ID")
644+
return nil
645+
}))
646+
})
647+
648+
t.Run("Set", func(t *testing.T) {
649+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
650+
err := progressStorage.Set(ctx, txn, 0.5, hlc.Timestamp{})
651+
require.Error(t, err)
652+
require.Contains(t, err.Error(), "invalid job ID")
653+
return nil
654+
}))
655+
})
656+
})
657+
658+
t.Run("StatusStorage", func(t *testing.T) {
659+
statusStorage := jobs.StatusStorage(jobspb.InvalidJobID)
660+
661+
t.Run("Get", func(t *testing.T) {
662+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
663+
_, _, err := statusStorage.Get(ctx, txn)
664+
require.Error(t, err)
665+
require.Contains(t, err.Error(), "invalid job ID")
666+
return nil
667+
}))
668+
})
669+
670+
t.Run("Set", func(t *testing.T) {
671+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
672+
err := statusStorage.Set(ctx, txn, "test status")
673+
require.Error(t, err)
674+
require.Contains(t, err.Error(), "invalid job ID")
675+
return nil
676+
}))
677+
})
678+
679+
t.Run("Clear", func(t *testing.T) {
680+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
681+
err := statusStorage.Clear(ctx, txn)
682+
require.Error(t, err)
683+
require.Contains(t, err.Error(), "invalid job ID")
684+
return nil
685+
}))
686+
})
687+
})
688+
689+
t.Run("MessageStorage", func(t *testing.T) {
690+
messageStorage := jobs.MessageStorage(jobspb.InvalidJobID)
691+
692+
t.Run("Record", func(t *testing.T) {
693+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
694+
err := messageStorage.Record(ctx, txn, "test", "test message")
695+
require.Error(t, err)
696+
require.Contains(t, err.Error(), "invalid job ID")
697+
return nil
698+
}))
699+
})
700+
701+
t.Run("Fetch", func(t *testing.T) {
702+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
703+
_, err := messageStorage.Fetch(ctx, txn)
704+
require.Error(t, err)
705+
require.Contains(t, err.Error(), "invalid job ID")
706+
return nil
707+
}))
708+
})
709+
})
710+
711+
t.Run("InfoStorage", func(t *testing.T) {
712+
t.Run("Get", func(t *testing.T) {
713+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
714+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
715+
_, _, err := infoStorage.Get(ctx, "test-op", "test-key")
716+
require.Error(t, err)
717+
require.Contains(t, err.Error(), "invalid job ID")
718+
return nil
719+
}))
720+
})
721+
722+
t.Run("Write", func(t *testing.T) {
723+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
724+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
725+
err := infoStorage.Write(ctx, "test-key", []byte("test-value"))
726+
require.Error(t, err)
727+
require.Contains(t, err.Error(), "invalid job ID")
728+
return nil
729+
}))
730+
})
731+
732+
t.Run("WriteFirstKey", func(t *testing.T) {
733+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
734+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
735+
err := infoStorage.WriteFirstKey(ctx, "test-key", []byte("test-value"))
736+
require.Error(t, err)
737+
require.Contains(t, err.Error(), "invalid job ID")
738+
return nil
739+
}))
740+
})
741+
742+
t.Run("Delete", func(t *testing.T) {
743+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
744+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
745+
err := infoStorage.Delete(ctx, "test-key")
746+
require.Error(t, err)
747+
require.Contains(t, err.Error(), "invalid job ID")
748+
return nil
749+
}))
750+
})
751+
752+
t.Run("DeleteRange", func(t *testing.T) {
753+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
754+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
755+
err := infoStorage.DeleteRange(ctx, "start", "end", 0)
756+
require.Error(t, err)
757+
require.Contains(t, err.Error(), "invalid job ID")
758+
return nil
759+
}))
760+
})
761+
762+
t.Run("Count", func(t *testing.T) {
763+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
764+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
765+
_, err := infoStorage.Count(ctx, "start", "end")
766+
require.Error(t, err)
767+
require.Contains(t, err.Error(), "invalid job ID")
768+
return nil
769+
}))
770+
})
771+
772+
t.Run("Iterate", func(t *testing.T) {
773+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
774+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
775+
err := infoStorage.Iterate(ctx, "prefix", func(key string, value []byte) error {
776+
return nil
777+
})
778+
require.Error(t, err)
779+
require.Contains(t, err.Error(), "invalid job ID")
780+
return nil
781+
}))
782+
})
783+
784+
t.Run("GetLast", func(t *testing.T) {
785+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
786+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
787+
err := infoStorage.GetLast(ctx, "prefix", func(key string, value []byte) error {
788+
return nil
789+
})
790+
require.Error(t, err)
791+
require.Contains(t, err.Error(), "invalid job ID")
792+
return nil
793+
}))
794+
})
795+
796+
t.Run("GetLegacyPayload", func(t *testing.T) {
797+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
798+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
799+
_, _, err := infoStorage.GetLegacyPayload(ctx, "test-op")
800+
require.Error(t, err)
801+
require.Contains(t, err.Error(), "invalid job ID")
802+
return nil
803+
}))
804+
})
805+
806+
t.Run("WriteLegacyPayload", func(t *testing.T) {
807+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
808+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
809+
err := infoStorage.WriteLegacyPayload(ctx, []byte("test-payload"))
810+
require.Error(t, err)
811+
require.Contains(t, err.Error(), "invalid job ID")
812+
return nil
813+
}))
814+
})
815+
816+
t.Run("GetLegacyProgress", func(t *testing.T) {
817+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
818+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
819+
_, _, err := infoStorage.GetLegacyProgress(ctx, "test-op")
820+
require.Error(t, err)
821+
require.Contains(t, err.Error(), "invalid job ID")
822+
return nil
823+
}))
824+
})
825+
826+
t.Run("WriteLegacyProgress", func(t *testing.T) {
827+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
828+
infoStorage := jobs.InfoStorageForJob(txn, jobspb.InvalidJobID)
829+
err := infoStorage.WriteLegacyProgress(ctx, []byte("test-progress"))
830+
require.Error(t, err)
831+
require.Contains(t, err.Error(), "invalid job ID")
832+
return nil
833+
}))
834+
})
835+
})
836+
837+
// Verify no rows were created in any of the job tables.
838+
sqlDB := sqlutils.MakeSQLRunner(s.SQLConn(t))
839+
sqlDB.CheckQueryResults(t,
840+
"SELECT count(*) FROM system.job_progress WHERE job_id = 0", [][]string{{"0"}})
841+
sqlDB.CheckQueryResults(t,
842+
"SELECT count(*) FROM system.job_progress_history WHERE job_id = 0", [][]string{{"0"}})
843+
sqlDB.CheckQueryResults(t,
844+
"SELECT count(*) FROM system.job_status WHERE job_id = 0", [][]string{{"0"}})
845+
sqlDB.CheckQueryResults(t,
846+
"SELECT count(*) FROM system.job_message WHERE job_id = 0", [][]string{{"0"}})
847+
sqlDB.CheckQueryResults(t,
848+
"SELECT count(*) FROM system.job_info WHERE job_id = 0", [][]string{{"0"}})
849+
}

0 commit comments

Comments
 (0)